1use crate::{
17 MAX_FETCH_TIMEOUT_IN_MS,
18 MAX_WORKERS,
19 ProposedBatch,
20 Transport,
21 events::{Event, TransmissionRequest, TransmissionResponse},
22 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27 console::prelude::*,
28 ledger::{
29 Transaction,
30 narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31 puzzle::{Solution, SolutionID},
32 },
33};
34
35use anyhow::Context;
36use colored::{ColoredString, Colorize};
37use indexmap::{IndexMap, IndexSet};
38#[cfg(feature = "locktick")]
39use locktick::parking_lot::{Mutex, RwLock};
40#[cfg(not(feature = "locktick"))]
41use parking_lot::{Mutex, RwLock};
42use rand::seq::IteratorRandom;
43use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
44use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
45
46#[derive(Clone)]
49pub struct Worker<N: Network> {
50 id: u8,
52 gateway: Arc<dyn Transport<N>>,
54 storage: Storage<N>,
56 ledger: Arc<dyn LedgerService<N>>,
58 proposed_batch: Arc<ProposedBatch<N>>,
60 ready: Arc<RwLock<Ready<N>>>,
62 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
64 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
66}
67
68impl<N: Network> Worker<N> {
69 pub fn new(
71 id: u8,
72 gateway: Arc<dyn Transport<N>>,
73 storage: Storage<N>,
74 ledger: Arc<dyn LedgerService<N>>,
75 proposed_batch: Arc<ProposedBatch<N>>,
76 ) -> Result<Self> {
77 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
79 Ok(Self {
81 id,
82 gateway,
83 storage,
84 ledger,
85 proposed_batch,
86 ready: Default::default(),
87 pending: Default::default(),
88 handles: Default::default(),
89 })
90 }
91
92 pub fn run(&self, receiver: WorkerReceiver<N>) {
94 info!("Starting worker instance {} of the memory pool...", self.id);
95 self.start_handlers(receiver);
97 }
98
99 pub const fn id(&self) -> u8 {
101 self.id
102 }
103
104 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
106 &self.pending
107 }
108}
109
110impl<N: Network> Worker<N> {
111 pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
113 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
114 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
116
117 pub fn num_transmissions(&self) -> usize {
119 self.ready.read().num_transmissions()
120 }
121
122 pub fn num_ratifications(&self) -> usize {
124 self.ready.read().num_ratifications()
125 }
126
127 pub fn num_solutions(&self) -> usize {
129 self.ready.read().num_solutions()
130 }
131
132 pub fn num_transactions(&self) -> usize {
134 self.ready.read().num_transactions()
135 }
136}
137
138impl<N: Network> Worker<N> {
139 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
141 self.ready.read().transmission_ids()
142 }
143
144 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
146 self.ready.read().transmissions()
147 }
148
149 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
151 self.ready.read().solutions().into_iter()
152 }
153
154 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
156 self.ready.read().transactions().into_iter()
157 }
158}
159
160impl<N: Network> Worker<N> {
161 pub(super) fn clear_solutions(&self) {
163 self.ready.write().clear_solutions()
164 }
165}
166
167impl<N: Network> Worker<N> {
168 fn format_transmission_id(&self, transmission_id: TransmissionID<N>) -> ColoredString {
170 if let Some(checksum) = transmission_id.checksum() {
171 format!("{}:{}", fmt_id(transmission_id), fmt_id(checksum))
173 } else {
174 fmt_id(transmission_id)
175 }
176 .dimmed()
177 }
178
179 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
181 let transmission_id = transmission_id.into();
182 self.ready.read().contains(transmission_id)
184 || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id))
185 || self.storage.contains_transmission(transmission_id)
186 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
187 }
188
189 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
194 if let Some(transmission) = self.ready.read().get(transmission_id) {
196 return Some(transmission);
197 }
198 if let Some(transmission) = self.storage.get_transmission(transmission_id) {
200 return Some(transmission);
201 }
202 if let Some(transmission) =
204 self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
205 {
206 return Some(transmission.clone());
207 }
208 None
209 }
210
211 pub async fn get_or_fetch_transmission(
213 &self,
214 peer_ip: SocketAddr,
215 transmission_id: TransmissionID<N>,
216 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
217 if let Some(transmission) = self.get_transmission(transmission_id) {
219 return Ok((transmission_id, transmission));
220 }
221 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
223 ensure!(candidate_id == transmission_id, "Invalid transmission ID");
225 Ok((transmission_id, transmission))
227 }
228
229 pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
231 self.ready.write().insert_front(key, value);
232 }
233
234 pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
236 self.ready.write().remove_front()
237 }
238
239 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
241 if !self.contains_transmission(transmission_id) {
243 return self.ready.write().insert(transmission_id, transmission);
245 }
246 false
247 }
248
249 pub(crate) fn broadcast_ping(&self) {
251 let transmission_ids = self
253 .ready
254 .read()
255 .transmission_ids()
256 .into_iter()
257 .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
258 .into_iter()
259 .collect::<IndexSet<_>>();
260
261 if !transmission_ids.is_empty() {
263 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
264 }
265 }
266}
267
268impl<N: Network> Worker<N> {
269 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
271 if self.contains_transmission(transmission_id) {
273 return;
274 }
275 if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
278 return;
279 }
280 let self_ = self.clone();
282 tokio::spawn(async move {
283 match self_.send_transmission_request(peer_ip, transmission_id).await {
285 Ok((candidate_id, transmission)) => {
287 if candidate_id == transmission_id {
289 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
293 }
294 }
295 Err(e) => {
297 warn!(
298 "Worker {} - Failed to fetch transmission '{}' from '{peer_ip}' (ping) - {e}",
299 self_.id,
300 self_.format_transmission_id(transmission_id),
301 );
302 }
303 }
304 });
305 }
306
307 pub(crate) fn process_transmission_from_peer(
309 &self,
310 peer_ip: SocketAddr,
311 transmission_id: TransmissionID<N>,
312 transmission: Transmission<N>,
313 ) {
314 if self.contains_transmission(transmission_id) {
316 return;
317 }
318 let is_well_formed = match (&transmission_id, &transmission) {
320 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
321 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
322 (TransmissionID::Ratification, Transmission::Ratification) => false,
325 _ => false,
327 };
328 if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
331 (transmission_id, &transmission)
332 {
333 if tx.is_execute() {
334 let self_ = self.clone();
335 let tx_ = tx.clone();
336 tokio::spawn(async move {
337 let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
338 });
339 }
340 }
341 if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
343 trace!(
344 "Worker {} - Added transmission '{}' from '{peer_ip}'",
345 self.id,
346 self.format_transmission_id(transmission_id),
347 );
348 }
349 }
350
351 pub(crate) async fn process_unconfirmed_solution(
359 &self,
360 solution_id: SolutionID<N>,
361 solution: Data<Solution<N>>,
362 ) -> Result<bool> {
363 let transmission = Transmission::Solution(solution.clone());
365 let checksum = solution.to_checksum::<N>()?;
367 let transmission_id = TransmissionID::Solution(solution_id, checksum);
369 self.pending.remove(transmission_id, Some(transmission.clone()));
371 if self.contains_transmission(transmission_id) {
373 return Ok(false);
374 }
375 self.ledger.check_solution_basic(solution_id, solution).await?;
377 if self.ready.write().insert(transmission_id, transmission) {
379 trace!(
380 "Worker {} - Added unconfirmed solution '{}'",
381 self.id,
382 self.format_transmission_id(transmission_id),
383 );
384 }
385 Ok(true)
386 }
387
388 pub(crate) async fn process_unconfirmed_transaction(
395 &self,
396 transaction_id: N::TransactionID,
397 transaction: Data<Transaction<N>>,
398 ) -> Result<bool> {
399 let transmission = Transmission::Transaction(transaction.clone());
401 let checksum = transaction.to_checksum::<N>()?;
403 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
405 self.pending.remove(transmission_id, Some(transmission.clone()));
407 if self.contains_transmission(transmission_id) {
409 return Ok(false);
410 }
411 let transaction = spawn_blocking!({
413 match transaction {
414 Data::Object(transaction) => Ok(transaction),
415 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?),
416 }
417 })?;
418
419 self.ledger.check_transaction_basic(transaction_id, transaction).await?;
421 if self.ready.write().insert(transmission_id, transmission) {
423 trace!(
424 "Worker {} - Added unconfirmed transaction '{}'",
425 self.id,
426 self.format_transmission_id(transmission_id),
427 );
428 }
429 Ok(true)
430 }
431}
432
433impl<N: Network> Worker<N> {
434 fn start_handlers(&self, receiver: WorkerReceiver<N>) {
436 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
437
438 let self_ = self.clone();
440 self.spawn(async move {
441 loop {
442 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
444
445 let self__ = self_.clone();
447 let _ = spawn_blocking!({
448 self__.pending.clear_expired_callbacks();
449 Ok(())
450 });
451 }
452 });
453
454 let self_ = self.clone();
456 self.spawn(async move {
457 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
458 self_.process_transmission_id_from_ping(peer_ip, transmission_id);
459 }
460 });
461
462 let self_ = self.clone();
464 self.spawn(async move {
465 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
466 self_.send_transmission_response(peer_ip, transmission_request);
467 }
468 });
469
470 let self_ = self.clone();
472 self.spawn(async move {
473 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
474 let self__ = self_.clone();
476 let _ = spawn_blocking!({
477 self__.finish_transmission_request(peer_ip, transmission_response);
478 Ok(())
479 });
480 }
481 });
482 }
483
484 async fn send_transmission_request(
486 &self,
487 peer_ip: SocketAddr,
488 transmission_id: TransmissionID<N>,
489 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
490 let (callback_sender, callback_receiver) = oneshot::channel();
492 let num_sent_requests = self.pending.num_sent_requests(transmission_id);
494 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
496 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
498 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
501
502 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
504
505 if should_send_request {
507 trace!("Requesting transmission {} from peer '{peer_ip}'", self.format_transmission_id(transmission_id));
508 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
510 bail!(
511 "Unable to fetch transmission {} - failed to send request",
512 self.format_transmission_id(transmission_id)
513 )
514 }
515 } else {
516 debug!(
517 "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
518 self.format_transmission_id(transmission_id)
519 );
520 }
521 let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
524 .await
525 .with_context(|| {
526 format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id))
527 })?
528 .with_context(|| {
529 format!("Unable to fetch transmission {}", self.format_transmission_id(transmission_id))
530 })?;
531
532 Ok((transmission_id, transmission))
533 }
534
535 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
538 let TransmissionResponse { transmission_id, mut transmission } = response;
539 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
541 if exists {
543 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
545 Ok(()) => {
546 trace!(
547 "Received valid transmission response from peer '{peer_ip}' for transmission '{}'",
548 self.format_transmission_id(transmission_id)
549 );
550 self.pending.remove(transmission_id, Some(transmission));
552 }
553 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
554 };
555 }
556 }
557
558 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
560 let TransmissionRequest { transmission_id } = request;
561 if let Some(transmission) = self.get_transmission(transmission_id) {
563 let self_ = self.clone();
565 tokio::spawn(async move {
566 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
567 });
568 }
569 }
570
571 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
573 self.handles.lock().push(tokio::spawn(future));
574 }
575
576 pub(crate) fn shut_down(&self) {
578 trace!("Shutting down worker {}...", self.id);
579 self.handles.lock().iter().for_each(|handle| handle.abort());
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
588 use snarkos_node_bft_ledger_service::LedgerService;
589 use snarkos_node_bft_storage_service::BFTMemoryService;
590 use snarkvm::{
591 console::{network::Network, types::Field},
592 ledger::{
593 Block,
594 CheckBlockError,
595 PendingBlock,
596 committee::Committee,
597 narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
598 test_helpers::sample_execution_transaction_with_fee,
599 },
600 prelude::Address,
601 };
602
603 use bytes::Bytes;
604 use indexmap::IndexMap;
605 use mockall::mock;
606 use std::{io, ops::Range};
607
608 type CurrentNetwork = snarkvm::prelude::MainnetV0;
609
610 const ITERATIONS: usize = 100;
611
612 mock! {
613 Gateway<N: Network> {}
614 #[async_trait]
615 impl<N:Network> Transport<N> for Gateway<N> {
616 fn broadcast(&self, event: Event<N>);
617 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
618 }
619 }
620
621 mock! {
622 #[derive(Debug)]
623 Ledger<N: Network> {}
624 #[async_trait]
625 impl<N: Network> LedgerService<N> for Ledger<N> {
626 fn latest_round(&self) -> u64;
627 fn latest_block_height(&self) -> u32;
628 fn latest_block(&self) -> Block<N>;
629 fn latest_restrictions_id(&self) -> Field<N>;
630 fn latest_leader(&self) -> Option<(u64, Address<N>)>;
631 fn update_latest_leader(&self, round: u64, leader: Address<N>);
632 fn contains_block_height(&self, height: u32) -> bool;
633 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
634 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
635 fn get_block_round(&self, height: u32) -> Result<u64>;
636 fn get_block(&self, height: u32) -> Result<Block<N>>;
637 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
638 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
639 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
640 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
641 fn current_committee(&self) -> Result<Committee<N>>;
642 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
643 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
644 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
645 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
646 fn ensure_transmission_is_well_formed(
647 &self,
648 transmission_id: TransmissionID<N>,
649 transmission: &mut Transmission<N>,
650 ) -> Result<()>;
651 async fn check_solution_basic(
652 &self,
653 solution_id: SolutionID<N>,
654 solution: Data<Solution<N>>,
655 ) -> Result<()>;
656 async fn check_transaction_basic(
657 &self,
658 transaction_id: N::TransactionID,
659 transaction: Transaction<N>,
660 ) -> Result<()>;
661 fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> std::result::Result<PendingBlock<N>, CheckBlockError<N>>;
662 fn check_block_content(&self, _block: PendingBlock<N>) -> std::result::Result<Block<N>, CheckBlockError<N>>;
663 fn check_next_block(&self, block: &Block<N>) -> Result<()>;
664 fn prepare_advance_to_next_quorum_block(
665 &self,
666 subdag: Subdag<N>,
667 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
668 ) -> Result<Block<N>>;
669 fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
670 fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
671 }
672 }
673
674 #[tokio::test]
675 async fn test_max_redundant_requests() {
676 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
677
678 let rng = &mut TestRng::default();
679 let committee =
681 snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
682 let committee_clone = committee.clone();
683 let mut mock_ledger = MockLedger::default();
685 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
686 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
687 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
688 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
689 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
690
691 assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
693 }
694
695 #[tokio::test]
696 async fn test_process_transmission() {
697 let rng = &mut TestRng::default();
698 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
700 let committee_clone = committee.clone();
701 let gateway = MockGateway::default();
703 let mut mock_ledger = MockLedger::default();
704 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
705 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
706 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
707 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
708 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
709 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
711
712 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
714 let data =
715 |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
716 let transmission_id = TransmissionID::Solution(
717 rng.r#gen::<u64>().into(),
718 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
719 );
720 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
721 let transmission = Transmission::Solution(data(rng));
722
723 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
725 assert!(worker.contains_transmission(transmission_id));
726 assert!(worker.ready.read().contains(transmission_id));
727 assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
728 assert!(worker.ready.write().remove_front().is_some());
730 assert!(!worker.ready.read().contains(transmission_id));
731 }
732
733 #[tokio::test]
734 async fn test_send_transmission() {
735 let rng = &mut TestRng::default();
736 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
738 let committee_clone = committee.clone();
739 let mut gateway = MockGateway::default();
741 gateway.expect_send().returning(|_, _| {
742 let (_tx, rx) = oneshot::channel();
743 Some(rx)
744 });
745 let mut mock_ledger = MockLedger::default();
746 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
747 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
748 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
749 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
750 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
752
753 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
755 let transmission_id = TransmissionID::Solution(
756 rng.r#gen::<u64>().into(),
757 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
758 );
759 let worker_ = worker.clone();
760 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
761 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
762 assert!(worker.pending.contains(transmission_id));
763 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
764 worker.finish_transmission_request(peer_ip, TransmissionResponse {
766 transmission_id,
767 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
768 });
769 assert!(!worker.pending.contains(transmission_id));
771 }
772
773 #[tokio::test]
774 async fn test_process_solution_ok() {
775 let rng = &mut TestRng::default();
776 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
778 let committee_clone = committee.clone();
779 let mut gateway = MockGateway::default();
781 gateway.expect_send().returning(|_, _| {
782 let (_tx, rx) = oneshot::channel();
783 Some(rx)
784 });
785 let mut mock_ledger = MockLedger::default();
786 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
787 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
788 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
789 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
790 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
791 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
793
794 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
796 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
797 let solution_id = rng.r#gen::<u64>().into();
798 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
799 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
800 let worker_ = worker.clone();
801 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
802 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
803 assert!(worker.pending.contains(transmission_id));
804 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
805 assert!(result.is_ok());
806 assert!(!worker.pending.contains(transmission_id));
807 assert!(worker.ready.read().contains(transmission_id));
808 }
809
810 #[tokio::test]
811 async fn test_process_solution_nok() {
812 let rng = &mut TestRng::default();
813 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
815 let committee_clone = committee.clone();
816 let mut gateway = MockGateway::default();
818 gateway.expect_send().returning(|_, _| {
819 let (_tx, rx) = oneshot::channel();
820 Some(rx)
821 });
822 let mut mock_ledger = MockLedger::default();
823 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
824 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
825 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
826 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
827 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
828 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
830
831 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
833 let solution_id = rng.r#gen::<u64>().into();
834 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
835 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
836 let transmission_id = TransmissionID::Solution(solution_id, checksum);
837 let worker_ = worker.clone();
838 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
839 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
840 assert!(worker.pending.contains(transmission_id));
841 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
842 assert!(result.is_err());
843 assert!(!worker.pending.contains(transmission_id));
844 assert!(!worker.ready.read().contains(transmission_id));
845 }
846
847 #[tokio::test]
848 async fn test_process_transaction_ok() {
849 let rng = &mut TestRng::default();
850 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
852 let committee_clone = committee.clone();
853 let mut gateway = MockGateway::default();
855 gateway.expect_send().returning(|_, _| {
856 let (_tx, rx) = oneshot::channel();
857 Some(rx)
858 });
859 let mut mock_ledger = MockLedger::default();
860 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
861 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
862 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
863 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
864 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
865 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
867
868 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
870 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
871 let transaction_id = transaction.id();
872 let transaction_data = Data::Object(transaction);
873 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
874 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
875 let worker_ = worker.clone();
876 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
877 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
878 assert!(worker.pending.contains(transmission_id));
879 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
880 assert!(result.is_ok());
881 assert!(!worker.pending.contains(transmission_id));
882 assert!(worker.ready.read().contains(transmission_id));
883 }
884
885 #[tokio::test]
886 async fn test_process_transaction_nok() {
887 let mut rng = &mut TestRng::default();
888 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
890 let committee_clone = committee.clone();
891 let mut gateway = MockGateway::default();
893 gateway.expect_send().returning(|_, _| {
894 let (_tx, rx) = oneshot::channel();
895 Some(rx)
896 });
897 let mut mock_ledger = MockLedger::default();
898 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
899 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
900 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
901 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
902 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
903 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
905
906 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
908 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
909 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
910 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
911 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
912 let worker_ = worker.clone();
913 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
914 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
915 assert!(worker.pending.contains(transmission_id));
916 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
917 assert!(result.is_err());
918 assert!(!worker.pending.contains(transmission_id));
919 assert!(!worker.ready.read().contains(transmission_id));
920 }
921
922 #[tokio::test]
923 async fn test_flood_transmission_requests() {
924 let rng = &mut TestRng::default();
925 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
927 let committee_clone = committee.clone();
928 let mut gateway = MockGateway::default();
930 gateway.expect_send().returning(|_, _| {
931 let (_tx, rx) = oneshot::channel();
932 Some(rx)
933 });
934 let mut mock_ledger = MockLedger::default();
935 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
936 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
937 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
938 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
939 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
940 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
942
943 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
945 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
946 let transaction_id = transaction.id();
947 let transaction_data = Data::Object(transaction);
948 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
949 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
950
951 let num_redundant_requests =
953 max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
954 let num_flood_requests = num_redundant_requests * 10;
955 let mut peer_ips =
956 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
957 let first_peer_ip = peer_ips[0];
958
959 for i in 1..=num_flood_requests {
961 let worker_ = worker.clone();
962 let peer_ip = peer_ips.pop().unwrap();
963 tokio::spawn(async move {
964 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
965 });
966 tokio::time::sleep(Duration::from_millis(10)).await;
967 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
969 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
970 }
971 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
973 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
974
975 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
977 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
978 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
979
980 for i in 1..=num_flood_requests {
982 let worker_ = worker.clone();
983 tokio::spawn(async move {
984 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
985 });
986 tokio::time::sleep(Duration::from_millis(10)).await;
987 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
988 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
989 }
990 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
992 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
993
994 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
996 assert!(result.is_ok());
997 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
998 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
999 assert!(!worker.pending.contains(transmission_id));
1000 assert!(worker.ready.read().contains(transmission_id));
1001 }
1002
1003 #[tokio::test]
1004 async fn test_storage_gc_on_initialization() {
1005 let rng = &mut TestRng::default();
1006
1007 for _ in 0..ITERATIONS {
1008 let max_gc_rounds = rng.gen_range(50..=100);
1010 let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
1011 let expected_gc_round = latest_ledger_round - max_gc_rounds;
1012
1013 let committee =
1015 snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
1016
1017 let mut mock_ledger = MockLedger::default();
1019 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
1020
1021 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
1022 let storage =
1024 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1025
1026 assert_eq!(storage.gc_round(), expected_gc_round);
1028 }
1029 }
1030}
1031
1032#[cfg(test)]
1033mod prop_tests {
1034 use super::*;
1035 use crate::Gateway;
1036 use snarkos_node_bft_ledger_service::MockLedgerService;
1037 use snarkvm::{
1038 console::account::Address,
1039 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1040 };
1041
1042 use test_strategy::proptest;
1043
1044 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1045
1046 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1048 let mut members = IndexMap::with_capacity(n as usize);
1049 for i in 0..n {
1050 let rng = &mut TestRng::fixed(i as u64);
1052 let address = Address::new(rng.r#gen());
1053 info!("Validator {i}: {address}");
1054 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
1055 }
1056 Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1058 }
1059
1060 #[proptest]
1061 fn worker_initialization(
1062 #[strategy(0..MAX_WORKERS)] id: u8,
1063 gateway: Gateway<CurrentNetwork>,
1064 storage: Storage<CurrentNetwork>,
1065 ) {
1066 let committee = new_test_committee(4);
1067 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1068 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1069 assert_eq!(worker.id(), id);
1070 }
1071
1072 #[proptest]
1073 fn invalid_worker_id(
1074 #[strategy(MAX_WORKERS..)] id: u8,
1075 gateway: Gateway<CurrentNetwork>,
1076 storage: Storage<CurrentNetwork>,
1077 ) {
1078 let committee = new_test_committee(4);
1079 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1080 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1081 if let Err(error) = worker {
1083 assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1084 }
1085 }
1086}