1#[cfg(not(test))]
17use crate::Gateway;
18use crate::{
19 MAX_FETCH_TIMEOUT,
20 MAX_WORKERS,
21 ProposedBatch,
22 ProposedBatchState,
23 Transport,
24 events::{Event, TransmissionRequest, TransmissionResponse},
25 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
26 spawn_blocking,
27};
28use snarkos_node_bft_ledger_service::LedgerService;
29use snarkvm::{
30 console::prelude::*,
31 ledger::{
32 Transaction,
33 narwhal::{BatchHeader, Data, Transmission, TransmissionID},
34 puzzle::{Solution, SolutionID},
35 },
36};
37
38use anyhow::Context;
39use colored::{ColoredString, Colorize};
40use indexmap::{IndexMap, IndexSet};
41#[cfg(feature = "locktick")]
42use locktick::parking_lot::{Mutex, RwLock};
43#[cfg(not(feature = "locktick"))]
44use parking_lot::{Mutex, RwLock};
45use rand::seq::IteratorRandom;
46
47use std::{future::Future, net::SocketAddr, sync::Arc};
48use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
49
50#[derive(Clone)]
53pub struct Worker<N: Network> {
54 id: u8,
56 #[cfg(not(test))]
58 gateway: Arc<Gateway<N>>,
59 #[cfg(test)]
60 gateway: Arc<dyn Transport<N>>,
61 storage: Storage<N>,
63 ledger: Arc<dyn LedgerService<N>>,
65 proposed_batch: Arc<ProposedBatch<N>>,
67 ready: Arc<RwLock<Ready<N>>>,
69 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
71 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73}
74
75impl<N: Network> Worker<N> {
76 pub fn new(
78 id: u8,
79 #[cfg(not(test))] gateway: Arc<Gateway<N>>,
80 #[cfg(test)] gateway: Arc<dyn Transport<N>>,
81 storage: Storage<N>,
82 ledger: Arc<dyn LedgerService<N>>,
83 proposed_batch: Arc<ProposedBatch<N>>,
84 ) -> Result<Self> {
85 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
87 Ok(Self {
89 id,
90 gateway,
91 storage,
92 ledger,
93 proposed_batch,
94 ready: Default::default(),
95 pending: Default::default(),
96 handles: Default::default(),
97 })
98 }
99
100 pub fn run(&self, receiver: WorkerReceiver<N>) {
102 info!("Starting worker instance {} of the memory pool...", self.id);
103 self.start_handlers(receiver);
105 }
106
107 pub const fn id(&self) -> u8 {
109 self.id
110 }
111
112 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
114 &self.pending
115 }
116}
117
118impl<N: Network> Worker<N> {
119 pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
121 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
122 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
124
125 pub fn num_transmissions(&self) -> usize {
127 self.ready.read().num_transmissions()
128 }
129
130 pub fn num_ratifications(&self) -> usize {
132 self.ready.read().num_ratifications()
133 }
134
135 pub fn num_solutions(&self) -> usize {
137 self.ready.read().num_solutions()
138 }
139
140 pub fn num_transactions(&self) -> usize {
142 self.ready.read().num_transactions()
143 }
144}
145
146impl<N: Network> Worker<N> {
147 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
149 self.ready.read().transmission_ids()
150 }
151
152 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
154 self.ready.read().transmissions()
155 }
156
157 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
159 self.ready.read().solutions().into_iter()
160 }
161
162 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
164 self.ready.read().transactions().into_iter()
165 }
166}
167
168impl<N: Network> Worker<N> {
169 pub(super) fn clear_solutions(&self) {
171 self.ready.write().clear_solutions()
172 }
173}
174
175impl<N: Network> Worker<N> {
176 fn format_transmission_id(&self, transmission_id: TransmissionID<N>) -> ColoredString {
178 if let Some(checksum) = transmission_id.checksum() {
179 format!("{}:{}", fmt_id(transmission_id), fmt_id(checksum))
181 } else {
182 fmt_id(transmission_id)
183 }
184 .dimmed()
185 }
186
187 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
189 let transmission_id = transmission_id.into();
190 self.ready.read().contains(transmission_id)
192 || matches!(&*self.proposed_batch.read(), ProposedBatchState::Certifying(p) if p.contains_transmission(transmission_id))
193 || self.storage.contains_transmission(transmission_id)
194 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
195 }
196
197 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
202 if let Some(transmission) = self.ready.read().get(transmission_id) {
204 return Some(transmission);
205 }
206 if let Some(transmission) = self.storage.get_transmission(transmission_id) {
208 return Some(transmission);
209 }
210 if let Some(transmission) = match &*self.proposed_batch.read() {
212 ProposedBatchState::Certifying(p) => p.get_transmission(transmission_id),
213 _ => None,
214 } {
215 return Some(transmission.clone());
216 }
217 None
218 }
219
220 pub async fn get_or_fetch_transmission(
222 &self,
223 peer_ip: SocketAddr,
224 transmission_id: TransmissionID<N>,
225 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
226 if let Some(transmission) = self.get_transmission(transmission_id) {
228 return Ok((transmission_id, transmission));
229 }
230 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
232 ensure!(candidate_id == transmission_id, "Invalid transmission ID");
234 Ok((transmission_id, transmission))
236 }
237
238 pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
240 self.ready.write().insert_front(key, value);
241 }
242
243 pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
245 self.ready.write().remove_front()
246 }
247
248 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
250 if !self.contains_transmission(transmission_id) {
252 return self.ready.write().insert(transmission_id, transmission);
254 }
255 false
256 }
257
258 pub(crate) fn broadcast_ping(&self) {
260 let transmission_ids = self
262 .ready
263 .read()
264 .transmission_ids()
265 .into_iter()
266 .sample(&mut rand::rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
267 .into_iter()
268 .collect::<IndexSet<_>>();
269
270 if !transmission_ids.is_empty() {
272 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
273 }
274 }
275}
276
277impl<N: Network> Worker<N> {
278 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
280 if self.contains_transmission(transmission_id) {
282 return;
283 }
284 if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
287 return;
288 }
289 let self_ = self.clone();
291 tokio::spawn(async move {
292 match self_.send_transmission_request(peer_ip, transmission_id).await {
294 Ok((candidate_id, transmission)) => {
296 if candidate_id == transmission_id {
298 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
302 }
303 }
304 Err(e) => {
306 warn!(
307 "Worker {} - Failed to fetch transmission '{}' from '{peer_ip}' (ping) - {e}",
308 self_.id,
309 self_.format_transmission_id(transmission_id),
310 );
311 }
312 }
313 });
314 }
315
316 pub(crate) fn process_transmission_from_peer(
318 &self,
319 peer_ip: SocketAddr,
320 transmission_id: TransmissionID<N>,
321 transmission: Transmission<N>,
322 ) {
323 if self.contains_transmission(transmission_id) {
325 return;
326 }
327 let is_well_formed = match (&transmission_id, &transmission) {
329 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
330 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
331 (TransmissionID::Ratification, Transmission::Ratification) => false,
334 _ => false,
336 };
337 if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
340 (transmission_id, &transmission)
341 && tx.is_execute()
342 {
343 let self_ = self.clone();
344 let tx_ = tx.clone();
345 tokio::spawn(async move {
346 let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
347 });
348 }
349 if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
351 trace!(
352 "Worker {} - Added transmission '{}' from '{peer_ip}'",
353 self.id,
354 self.format_transmission_id(transmission_id),
355 );
356 }
357 }
358
359 pub(crate) async fn process_unconfirmed_solution(
367 &self,
368 solution_id: SolutionID<N>,
369 solution: Data<Solution<N>>,
370 ) -> Result<bool> {
371 let transmission = Transmission::Solution(solution.clone());
373 let checksum = solution.to_checksum::<N>()?;
375 let transmission_id = TransmissionID::Solution(solution_id, checksum);
377 self.pending.remove(transmission_id, Some(transmission.clone()));
379 if self.contains_transmission(transmission_id) {
381 return Ok(false);
382 }
383 self.ledger.check_solution_basic(solution_id, solution).await?;
385 if self.ready.write().insert(transmission_id, transmission) {
387 trace!(
388 "Worker {} - Added unconfirmed solution '{}'",
389 self.id,
390 self.format_transmission_id(transmission_id),
391 );
392 }
393 Ok(true)
394 }
395
396 pub(crate) async fn process_unconfirmed_transaction(
403 &self,
404 transaction_id: N::TransactionID,
405 transaction: Data<Transaction<N>>,
406 ) -> Result<bool> {
407 let transmission = Transmission::Transaction(transaction.clone());
409 let checksum = transaction.to_checksum::<N>()?;
411 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
413 self.pending.remove(transmission_id, Some(transmission.clone()));
415 if self.contains_transmission(transmission_id) {
417 return Ok(false);
418 }
419 let transaction = spawn_blocking!({
421 match transaction {
422 Data::Object(transaction) => Ok(transaction),
423 Data::Buffer(bytes) => {
424 Ok(Transaction::<N>::read_le(&mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64))?)
425 }
426 }
427 })?;
428
429 self.ledger.check_transaction_basic(transaction_id, transaction).await?;
431 if self.ready.write().insert(transmission_id, transmission) {
433 trace!(
434 "Worker {} - Added unconfirmed transaction '{}'",
435 self.id,
436 self.format_transmission_id(transmission_id),
437 );
438 }
439 Ok(true)
440 }
441}
442
443impl<N: Network> Worker<N> {
444 fn start_handlers(&self, receiver: WorkerReceiver<N>) {
446 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
447
448 let self_ = self.clone();
450 self.spawn(async move {
451 loop {
452 tokio::time::sleep(MAX_FETCH_TIMEOUT).await;
454
455 let self__ = self_.clone();
457 let _ = spawn_blocking!({
458 self__.pending.clear_expired_callbacks();
459 Ok(())
460 });
461 }
462 });
463
464 let self_ = self.clone();
466 self.spawn(async move {
467 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
468 self_.process_transmission_id_from_ping(peer_ip, transmission_id);
469 }
470 });
471
472 let self_ = self.clone();
474 self.spawn(async move {
475 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
476 self_.send_transmission_response(peer_ip, transmission_request);
477 }
478 });
479
480 let self_ = self.clone();
482 self.spawn(async move {
483 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
484 let self__ = self_.clone();
486 let _ = spawn_blocking!({
487 self__.finish_transmission_request(peer_ip, transmission_response);
488 Ok(())
489 });
490 }
491 });
492 }
493
494 async fn send_transmission_request(
496 &self,
497 peer_ip: SocketAddr,
498 transmission_id: TransmissionID<N>,
499 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
500 let (callback_sender, callback_receiver) = oneshot::channel();
502 let num_sent_requests = self.pending.num_sent_requests(transmission_id);
504 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
506 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
508 #[cfg(test)]
510 let stake_redundancy_reached = || Ok::<_, anyhow::Error>(true);
511 #[cfg(not(test))]
512 let stake_redundancy_reached = || self.pending.request_stake_redundancy_reached(&self.gateway, transmission_id);
513 let should_send_request = !contains_peer_with_sent_request
517 && (num_sent_requests < num_redundant_requests || !stake_redundancy_reached()?);
518
519 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
521
522 if should_send_request {
524 trace!("Requesting transmission {} from peer '{peer_ip}'", self.format_transmission_id(transmission_id));
525 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
527 bail!(
528 "Unable to fetch transmission {} - failed to send request",
529 self.format_transmission_id(transmission_id)
530 )
531 }
532 } else {
533 debug!(
534 "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
535 self.format_transmission_id(transmission_id)
536 );
537 }
538
539 let transmission = timeout(MAX_FETCH_TIMEOUT, callback_receiver)
541 .await
542 .with_context(|| {
543 format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id))
544 })?
545 .with_context(|| {
546 format!("Unable to fetch transmission {}", self.format_transmission_id(transmission_id))
547 })?;
548
549 Ok((transmission_id, transmission))
550 }
551
552 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
555 let TransmissionResponse { transmission_id, mut transmission } = response;
556 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
558 if exists {
560 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
562 Ok(()) => {
563 trace!(
564 "Received valid transmission response from peer '{peer_ip}' for transmission '{}'",
565 self.format_transmission_id(transmission_id)
566 );
567 self.pending.remove(transmission_id, Some(transmission));
569 }
570 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
571 };
572 }
573 }
574
575 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
577 let TransmissionRequest { transmission_id } = request;
578 if let Some(transmission) = self.get_transmission(transmission_id) {
580 let self_ = self.clone();
582 tokio::spawn(async move {
583 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
584 });
585 }
586 }
587
588 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
590 self.handles.lock().push(tokio::spawn(future));
591 }
592
593 pub(crate) fn shut_down(&self) {
595 trace!("Shutting down worker {}...", self.id);
596 self.handles.lock().iter().for_each(|handle| handle.abort());
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
605 use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService, LedgerUpdateService};
606 use snarkos_node_bft_storage_service::BFTMemoryService;
607 use snarkvm::{
608 console::{network::Network, types::Field},
609 ledger::{
610 Block,
611 CheckBlockError,
612 PendingBlock,
613 committee::Committee,
614 narwhal::{BatchCertificate, Transmission, TransmissionID},
615 test_helpers::sample_execution_transaction_with_fee,
616 },
617 prelude::Address,
618 };
619
620 use bytes::Bytes;
621 use mockall::mock;
622 use rand::RngExt;
623 use std::{io, ops::Range, time::Duration};
624
625 type CurrentNetwork = snarkvm::prelude::MainnetV0;
626
627 const ITERATIONS: usize = 100;
628
629 mock! {
630 Gateway<N: Network> {}
631 #[async_trait]
632 impl<N:Network> Transport<N> for Gateway<N> {
633 fn broadcast(&self, event: Event<N>);
634 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
635 }
636 }
637
638 mock! {
639 #[derive(Debug)]
640 Ledger<N: Network> {}
641 #[async_trait]
642 impl<N: Network> LedgerService<N> for Ledger<N> {
643 fn latest_round(&self) -> u64;
644 fn latest_block_height(&self) -> u32;
645 fn latest_block(&self) -> Block<N>;
646 fn latest_restrictions_id(&self) -> Field<N>;
647 fn latest_leader(&self) -> Option<(u64, Address<N>)>;
648 fn update_latest_leader(&self, round: u64, leader: Address<N>);
649 fn contains_block_height(&self, height: u32) -> bool;
650 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
651 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
652 fn get_block_round(&self, height: u32) -> Result<u64>;
653 fn get_block(&self, height: u32) -> Result<Block<N>>;
654 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
655 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Option<Solution<N>>>;
656 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Option<Transaction<N>>>;
657 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
658 fn current_committee(&self) -> Result<Committee<N>>;
659 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
660 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
661 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
662 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
663 fn ensure_transmission_is_well_formed(
664 &self,
665 transmission_id: TransmissionID<N>,
666 transmission: &mut Transmission<N>,
667 ) -> Result<()>;
668 async fn check_solution_basic(
669 &self,
670 solution_id: SolutionID<N>,
671 solution: Data<Solution<N>>,
672 ) -> Result<()>;
673 async fn check_transaction_basic(
674 &self,
675 transaction_id: N::TransactionID,
676 transaction: Transaction<N>,
677 ) -> Result<()>;
678 fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>, CheckBlockError<N>>;
679 fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError>;
680 fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
681 fn is_stopped(&self) -> bool;
682 }
683 }
684
685 #[tokio::test]
686 async fn test_max_redundant_requests() {
687 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
688
689 let rng = &mut TestRng::default();
690 let committee =
692 snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
693 let committee_clone = committee.clone();
694 let mut mock_ledger = MockLedger::default();
696 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
697 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
698 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
699 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
700 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
701
702 assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
704 }
705
706 #[tokio::test]
707 async fn test_process_transmission() {
708 let rng = &mut TestRng::default();
709 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
711 let committee_clone = committee.clone();
712 let gateway = MockGateway::default();
714 let mut mock_ledger = MockLedger::default();
715 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
716 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
717 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
718 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
719 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
720 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
722
723 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
725 let data =
726 |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
727 let transmission_id = TransmissionID::Solution(
728 rng.random::<u64>().into(),
729 rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
730 );
731 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
732 let transmission = Transmission::Solution(data(rng));
733
734 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
736 assert!(worker.contains_transmission(transmission_id));
737 assert!(worker.ready.read().contains(transmission_id));
738 assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
739 assert!(worker.ready.write().remove_front().is_some());
741 assert!(!worker.ready.read().contains(transmission_id));
742 }
743
744 #[tokio::test]
745 async fn test_send_transmission() {
746 let rng = &mut TestRng::default();
747 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
749 let committee_clone = committee.clone();
750 let mut gateway = MockGateway::default();
752 gateway.expect_send().returning(|_, _| {
753 let (_tx, rx) = oneshot::channel();
754 Some(rx)
755 });
756 let mut mock_ledger = MockLedger::default();
757 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
758 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
759 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
760 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
761 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
763
764 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
766 let transmission_id = TransmissionID::Solution(
767 rng.random::<u64>().into(),
768 rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
769 );
770 let worker_ = worker.clone();
771 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
772 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
773 assert!(worker.pending.contains(transmission_id));
774 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
775 worker.finish_transmission_request(peer_ip, TransmissionResponse {
777 transmission_id,
778 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
779 });
780 assert!(!worker.pending.contains(transmission_id));
782 }
783
784 #[tokio::test]
785 async fn test_process_solution_ok() {
786 let rng = &mut TestRng::default();
787 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
789 let committee_clone = committee.clone();
790 let mut gateway = MockGateway::default();
792 gateway.expect_send().returning(|_, _| {
793 let (_tx, rx) = oneshot::channel();
794 Some(rx)
795 });
796 let mut mock_ledger = MockLedger::default();
797 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
798 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
799 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
800 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
801 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
802 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
804
805 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
807 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
808 let solution_id = rng.random::<u64>().into();
809 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
810 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
811 let worker_ = worker.clone();
812 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
813 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
814 assert!(worker.pending.contains(transmission_id));
815 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
816 assert!(result.is_ok());
817 assert!(!worker.pending.contains(transmission_id));
818 assert!(worker.ready.read().contains(transmission_id));
819 }
820
821 #[tokio::test]
822 async fn test_process_solution_nok() {
823 let rng = &mut TestRng::default();
824 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
826 let committee_clone = committee.clone();
827 let mut gateway = MockGateway::default();
829 gateway.expect_send().returning(|_, _| {
830 let (_tx, rx) = oneshot::channel();
831 Some(rx)
832 });
833 let mut mock_ledger = MockLedger::default();
834 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
835 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
836 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
837 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
838 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
839 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
841
842 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
844 let solution_id = rng.random::<u64>().into();
845 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
846 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
847 let transmission_id = TransmissionID::Solution(solution_id, checksum);
848 let worker_ = worker.clone();
849 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
850 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
851 assert!(worker.pending.contains(transmission_id));
852 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
853 assert!(result.is_err());
854 assert!(!worker.pending.contains(transmission_id));
855 assert!(!worker.ready.read().contains(transmission_id));
856 }
857
858 #[tokio::test]
859 async fn test_process_transaction_ok() {
860 let rng = &mut TestRng::default();
861 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
863 let committee_clone = committee.clone();
864 let mut gateway = MockGateway::default();
866 gateway.expect_send().returning(|_, _| {
867 let (_tx, rx) = oneshot::channel();
868 Some(rx)
869 });
870 let mut mock_ledger = MockLedger::default();
871 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
872 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
873 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
874 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
875 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
876 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
878
879 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
881 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
882 let transaction_id = transaction.id();
883 let transaction_data = Data::Object(transaction);
884 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
885 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
886 let worker_ = worker.clone();
887 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
888 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
889 assert!(worker.pending.contains(transmission_id));
890 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
891 assert!(result.is_ok());
892 assert!(!worker.pending.contains(transmission_id));
893 assert!(worker.ready.read().contains(transmission_id));
894 }
895
896 #[tokio::test]
897 async fn test_process_transaction_nok() {
898 let mut rng = &mut TestRng::default();
899 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
901 let committee_clone = committee.clone();
902 let mut gateway = MockGateway::default();
904 gateway.expect_send().returning(|_, _| {
905 let (_tx, rx) = oneshot::channel();
906 Some(rx)
907 });
908 let mut mock_ledger = MockLedger::default();
909 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
910 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
911 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
912 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
913 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
914 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
916
917 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
919 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
920 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
921 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
922 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
923 let worker_ = worker.clone();
924 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
925 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
926 assert!(worker.pending.contains(transmission_id));
927 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
928 assert!(result.is_err());
929 assert!(!worker.pending.contains(transmission_id));
930 assert!(!worker.ready.read().contains(transmission_id));
931 }
932
933 #[tokio::test]
934 async fn test_flood_transmission_requests() {
935 let rng = &mut TestRng::default();
936 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
938 let committee_clone = committee.clone();
939 let mut gateway = MockGateway::default();
941 gateway.expect_send().returning(|_, _| {
942 let (_tx, rx) = oneshot::channel();
943 Some(rx)
944 });
945 let mut mock_ledger = MockLedger::default();
946 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
947 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
948 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
949 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
950 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
951 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
953
954 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
956 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
957 let transaction_id = transaction.id();
958 let transaction_data = Data::Object(transaction);
959 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
960 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
961
962 let num_redundant_requests =
964 max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
965 let num_flood_requests = num_redundant_requests * 10;
966 let mut peer_ips =
967 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
968 let first_peer_ip = peer_ips[0];
969
970 for i in 1..=num_flood_requests {
972 let worker_ = worker.clone();
973 let peer_ip = peer_ips.pop().unwrap();
974 tokio::spawn(async move {
975 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
976 });
977 tokio::time::sleep(Duration::from_millis(10)).await;
978 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
980 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
981 }
982 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
984 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
985
986 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
988 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
989 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
990
991 for i in 1..=num_flood_requests {
993 let worker_ = worker.clone();
994 tokio::spawn(async move {
995 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
996 });
997 tokio::time::sleep(Duration::from_millis(10)).await;
998 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
999 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
1000 }
1001 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
1003 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
1004
1005 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
1007 assert!(result.is_ok());
1008 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
1009 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
1010 assert!(!worker.pending.contains(transmission_id));
1011 assert!(worker.ready.read().contains(transmission_id));
1012 }
1013
1014 #[tokio::test]
1015 async fn test_storage_gc_on_initialization() {
1016 let rng = &mut TestRng::default();
1017
1018 for _ in 0..ITERATIONS {
1019 let max_gc_rounds = rng.random_range(50..=100);
1021 let latest_ledger_round = rng.random_range((max_gc_rounds + 1)..1000);
1022 let expected_gc_round = latest_ledger_round - max_gc_rounds;
1023
1024 let committee =
1026 snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
1027
1028 let mut mock_ledger = MockLedger::default();
1030 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
1031
1032 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
1033 let storage =
1035 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds)
1036 .unwrap();
1037
1038 assert_eq!(storage.gc_round(), expected_gc_round);
1040 }
1041 }
1042}
1043
1044#[cfg(test)]
1045mod prop_tests {
1046 use super::*;
1047 use crate::Gateway;
1048 use snarkos_node_bft_ledger_service::MockLedgerService;
1049 use snarkvm::{
1050 console::account::Address,
1051 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1052 };
1053
1054 use rand::RngExt;
1055 use test_strategy::proptest;
1056
1057 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1058
1059 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1061 let mut members = IndexMap::with_capacity(n as usize);
1062 for i in 0..n {
1063 let rng = &mut TestRng::fixed(i as u64);
1065 let address = Address::new(rng.random());
1066 info!("Validator {i}: {address}");
1067 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.random_range(0..100)));
1068 }
1069 Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1071 }
1072
1073 #[proptest]
1074 fn worker_initialization(
1075 #[strategy(0..MAX_WORKERS)] id: u8,
1076 gateway: Gateway<CurrentNetwork>,
1077 storage: Storage<CurrentNetwork>,
1078 ) {
1079 let committee = new_test_committee(4);
1080 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1081 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1082 assert_eq!(worker.id(), id);
1083 }
1084
1085 #[proptest]
1086 fn invalid_worker_id(
1087 #[strategy(MAX_WORKERS..)] id: u8,
1088 gateway: Gateway<CurrentNetwork>,
1089 storage: Storage<CurrentNetwork>,
1090 ) {
1091 let committee = new_test_committee(4);
1092 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1093 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1094 if let Err(error) = worker {
1096 assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1097 }
1098 }
1099}