1use crate::{
17 MAX_FETCH_TIMEOUT_IN_MS,
18 MAX_WORKERS,
19 ProposedBatch,
20 Transport,
21 events::{Event, TransmissionRequest, TransmissionResponse},
22 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27 console::prelude::*,
28 ledger::{
29 block::Transaction,
30 narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31 puzzle::{Solution, SolutionID},
32 },
33};
34
35use colored::Colorize;
36use indexmap::{IndexMap, IndexSet};
37#[cfg(feature = "locktick")]
38use locktick::parking_lot::{Mutex, RwLock};
39#[cfg(not(feature = "locktick"))]
40use parking_lot::{Mutex, RwLock};
41use rand::seq::IteratorRandom;
42use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
43use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
44
45#[derive(Clone)]
48pub struct Worker<N: Network> {
49 id: u8,
51 gateway: Arc<dyn Transport<N>>,
53 storage: Storage<N>,
55 ledger: Arc<dyn LedgerService<N>>,
57 proposed_batch: Arc<ProposedBatch<N>>,
59 ready: Arc<RwLock<Ready<N>>>,
61 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
63 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
65}
66
67impl<N: Network> Worker<N> {
68 pub fn new(
70 id: u8,
71 gateway: Arc<dyn Transport<N>>,
72 storage: Storage<N>,
73 ledger: Arc<dyn LedgerService<N>>,
74 proposed_batch: Arc<ProposedBatch<N>>,
75 ) -> Result<Self> {
76 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
78 Ok(Self {
80 id,
81 gateway,
82 storage,
83 ledger,
84 proposed_batch,
85 ready: Default::default(),
86 pending: Default::default(),
87 handles: Default::default(),
88 })
89 }
90
91 pub fn run(&self, receiver: WorkerReceiver<N>) {
93 info!("Starting worker instance {} of the memory pool...", self.id);
94 self.start_handlers(receiver);
96 }
97
98 pub const fn id(&self) -> u8 {
100 self.id
101 }
102
103 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
105 &self.pending
106 }
107}
108
109impl<N: Network> Worker<N> {
110 pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
112 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
113 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
115
116 pub fn num_transmissions(&self) -> usize {
118 self.ready.read().num_transmissions()
119 }
120
121 pub fn num_ratifications(&self) -> usize {
123 self.ready.read().num_ratifications()
124 }
125
126 pub fn num_solutions(&self) -> usize {
128 self.ready.read().num_solutions()
129 }
130
131 pub fn num_transactions(&self) -> usize {
133 self.ready.read().num_transactions()
134 }
135}
136
137impl<N: Network> Worker<N> {
138 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
140 self.ready.read().transmission_ids()
141 }
142
143 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
145 self.ready.read().transmissions()
146 }
147
148 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
150 self.ready.read().solutions().into_iter()
151 }
152
153 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
155 self.ready.read().transactions().into_iter()
156 }
157}
158
159impl<N: Network> Worker<N> {
160 pub(super) fn clear_solutions(&self) {
162 self.ready.write().clear_solutions()
163 }
164}
165
166impl<N: Network> Worker<N> {
167 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
169 let transmission_id = transmission_id.into();
170 self.ready.read().contains(transmission_id)
172 || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id))
173 || self.storage.contains_transmission(transmission_id)
174 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
175 }
176
177 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
182 if let Some(transmission) = self.ready.read().get(transmission_id) {
184 return Some(transmission);
185 }
186 if let Some(transmission) = self.storage.get_transmission(transmission_id) {
188 return Some(transmission);
189 }
190 if let Some(transmission) =
192 self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
193 {
194 return Some(transmission.clone());
195 }
196 None
197 }
198
199 pub async fn get_or_fetch_transmission(
201 &self,
202 peer_ip: SocketAddr,
203 transmission_id: TransmissionID<N>,
204 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
205 if let Some(transmission) = self.get_transmission(transmission_id) {
207 return Ok((transmission_id, transmission));
208 }
209 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
211 ensure!(candidate_id == transmission_id, "Invalid transmission ID");
213 Ok((transmission_id, transmission))
215 }
216
217 pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
219 self.ready.write().insert_front(key, value);
220 }
221
222 pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
224 self.ready.write().remove_front()
225 }
226
227 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
229 if !self.contains_transmission(transmission_id) {
231 return self.ready.write().insert(transmission_id, transmission);
233 }
234 false
235 }
236
237 pub(crate) fn broadcast_ping(&self) {
239 let transmission_ids = self
241 .ready
242 .read()
243 .transmission_ids()
244 .into_iter()
245 .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
246 .into_iter()
247 .collect::<IndexSet<_>>();
248
249 if !transmission_ids.is_empty() {
251 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
252 }
253 }
254}
255
256impl<N: Network> Worker<N> {
257 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
259 if self.contains_transmission(transmission_id) {
261 return;
262 }
263 if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
266 return;
267 }
268 let self_ = self.clone();
270 tokio::spawn(async move {
271 match self_.send_transmission_request(peer_ip, transmission_id).await {
273 Ok((candidate_id, transmission)) => {
275 if candidate_id == transmission_id {
277 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
281 }
282 }
283 Err(e) => {
285 warn!(
286 "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
287 self_.id,
288 fmt_id(transmission_id),
289 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
290 );
291 }
292 }
293 });
294 }
295
296 pub(crate) fn process_transmission_from_peer(
298 &self,
299 peer_ip: SocketAddr,
300 transmission_id: TransmissionID<N>,
301 transmission: Transmission<N>,
302 ) {
303 if self.contains_transmission(transmission_id) {
305 return;
306 }
307 let is_well_formed = match (&transmission_id, &transmission) {
309 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
310 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
311 (TransmissionID::Ratification, Transmission::Ratification) => false,
314 _ => false,
316 };
317 if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
320 (transmission_id, &transmission)
321 {
322 if tx.is_execute() {
323 let self_ = self.clone();
324 let tx_ = tx.clone();
325 tokio::spawn(async move {
326 let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
327 });
328 }
329 }
330 if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
332 trace!(
333 "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
334 self.id,
335 fmt_id(transmission_id),
336 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
337 );
338 }
339 }
340
341 pub(crate) async fn process_unconfirmed_solution(
344 &self,
345 solution_id: SolutionID<N>,
346 solution: Data<Solution<N>>,
347 ) -> Result<()> {
348 let transmission = Transmission::Solution(solution.clone());
350 let checksum = solution.to_checksum::<N>()?;
352 let transmission_id = TransmissionID::Solution(solution_id, checksum);
354 self.pending.remove(transmission_id, Some(transmission.clone()));
356 if self.contains_transmission(transmission_id) {
358 bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
359 }
360 self.ledger.check_solution_basic(solution_id, solution).await?;
362 if self.ready.write().insert(transmission_id, transmission) {
364 trace!(
365 "Worker {} - Added unconfirmed solution '{}.{}'",
366 self.id,
367 fmt_id(solution_id),
368 fmt_id(checksum).dimmed()
369 );
370 }
371 Ok(())
372 }
373
374 pub(crate) async fn process_unconfirmed_transaction(
376 &self,
377 transaction_id: N::TransactionID,
378 transaction: Data<Transaction<N>>,
379 ) -> Result<()> {
380 let transmission = Transmission::Transaction(transaction.clone());
382 let checksum = transaction.to_checksum::<N>()?;
384 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
386 self.pending.remove(transmission_id, Some(transmission.clone()));
388 if self.contains_transmission(transmission_id) {
390 bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
391 }
392 let transaction = spawn_blocking!({
394 match transaction {
395 Data::Object(transaction) => Ok(transaction),
396 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?),
397 }
398 })?;
399
400 self.ledger.check_transaction_basic(transaction_id, transaction).await?;
402 if self.ready.write().insert(transmission_id, transmission) {
404 trace!(
405 "Worker {}.{} - Added unconfirmed transaction '{}'",
406 self.id,
407 fmt_id(transaction_id),
408 fmt_id(checksum).dimmed()
409 );
410 }
411 Ok(())
412 }
413}
414
415impl<N: Network> Worker<N> {
416 fn start_handlers(&self, receiver: WorkerReceiver<N>) {
418 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
419
420 let self_ = self.clone();
422 self.spawn(async move {
423 loop {
424 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
426
427 let self__ = self_.clone();
429 let _ = spawn_blocking!({
430 self__.pending.clear_expired_callbacks();
431 Ok(())
432 });
433 }
434 });
435
436 let self_ = self.clone();
438 self.spawn(async move {
439 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
440 self_.process_transmission_id_from_ping(peer_ip, transmission_id);
441 }
442 });
443
444 let self_ = self.clone();
446 self.spawn(async move {
447 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
448 self_.send_transmission_response(peer_ip, transmission_request);
449 }
450 });
451
452 let self_ = self.clone();
454 self.spawn(async move {
455 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
456 let self__ = self_.clone();
458 let _ = spawn_blocking!({
459 self__.finish_transmission_request(peer_ip, transmission_response);
460 Ok(())
461 });
462 }
463 });
464 }
465
466 async fn send_transmission_request(
468 &self,
469 peer_ip: SocketAddr,
470 transmission_id: TransmissionID<N>,
471 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
472 let (callback_sender, callback_receiver) = oneshot::channel();
474 let num_sent_requests = self.pending.num_sent_requests(transmission_id);
476 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
478 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
480 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
483
484 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
486
487 if should_send_request {
489 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
491 bail!("Unable to fetch transmission - failed to send request")
492 }
493 } else {
494 debug!(
495 "Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
496 fmt_id(transmission_id),
497 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
498 );
499 }
500 match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
502 Ok(result) => Ok((transmission_id, result?)),
504 Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
506 }
507 }
508
509 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
512 let TransmissionResponse { transmission_id, mut transmission } = response;
513 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
515 if exists {
517 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
519 Ok(()) => {
520 self.pending.remove(transmission_id, Some(transmission));
522 }
523 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
524 };
525 }
526 }
527
528 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
530 let TransmissionRequest { transmission_id } = request;
531 if let Some(transmission) = self.get_transmission(transmission_id) {
533 let self_ = self.clone();
535 tokio::spawn(async move {
536 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
537 });
538 }
539 }
540
541 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
543 self.handles.lock().push(tokio::spawn(future));
544 }
545
546 pub(crate) fn shut_down(&self) {
548 trace!("Shutting down worker {}...", self.id);
549 self.handles.lock().iter().for_each(|handle| handle.abort());
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
558 use snarkos_node_bft_ledger_service::LedgerService;
559 use snarkos_node_bft_storage_service::BFTMemoryService;
560 use snarkvm::{
561 console::{network::Network, types::Field},
562 ledger::{
563 block::Block,
564 committee::Committee,
565 narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
566 test_helpers::sample_execution_transaction_with_fee,
567 },
568 prelude::Address,
569 };
570
571 use bytes::Bytes;
572 use indexmap::IndexMap;
573 use mockall::mock;
574 use std::{io, ops::Range};
575
576 type CurrentNetwork = snarkvm::prelude::MainnetV0;
577
578 const ITERATIONS: usize = 100;
579
580 mock! {
581 Gateway<N: Network> {}
582 #[async_trait]
583 impl<N:Network> Transport<N> for Gateway<N> {
584 fn broadcast(&self, event: Event<N>);
585 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
586 }
587 }
588
589 mock! {
590 #[derive(Debug)]
591 Ledger<N: Network> {}
592 #[async_trait]
593 impl<N: Network> LedgerService<N> for Ledger<N> {
594 fn latest_round(&self) -> u64;
595 fn latest_block_height(&self) -> u32;
596 fn latest_block(&self) -> Block<N>;
597 fn latest_restrictions_id(&self) -> Field<N>;
598 fn latest_leader(&self) -> Option<(u64, Address<N>)>;
599 fn update_latest_leader(&self, round: u64, leader: Address<N>);
600 fn contains_block_height(&self, height: u32) -> bool;
601 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
602 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
603 fn get_block_round(&self, height: u32) -> Result<u64>;
604 fn get_block(&self, height: u32) -> Result<Block<N>>;
605 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
606 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
607 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
608 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
609 fn current_committee(&self) -> Result<Committee<N>>;
610 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
611 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
612 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
613 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
614 fn ensure_transmission_is_well_formed(
615 &self,
616 transmission_id: TransmissionID<N>,
617 transmission: &mut Transmission<N>,
618 ) -> Result<()>;
619 async fn check_solution_basic(
620 &self,
621 solution_id: SolutionID<N>,
622 solution: Data<Solution<N>>,
623 ) -> Result<()>;
624 async fn check_transaction_basic(
625 &self,
626 transaction_id: N::TransactionID,
627 transaction: Transaction<N>,
628 ) -> Result<()>;
629 fn check_next_block(&self, block: &Block<N>) -> Result<()>;
630 fn prepare_advance_to_next_quorum_block(
631 &self,
632 subdag: Subdag<N>,
633 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
634 ) -> Result<Block<N>>;
635 fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
636 fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
637 }
638 }
639
640 #[tokio::test]
641 async fn test_max_redundant_requests() {
642 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
643
644 let rng = &mut TestRng::default();
645 let committee =
647 snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
648 let committee_clone = committee.clone();
649 let mut mock_ledger = MockLedger::default();
651 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
652 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
653 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
654 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
655 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
656
657 assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
659 }
660
661 #[tokio::test]
662 async fn test_process_transmission() {
663 let rng = &mut TestRng::default();
664 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
666 let committee_clone = committee.clone();
667 let gateway = MockGateway::default();
669 let mut mock_ledger = MockLedger::default();
670 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
671 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
672 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
673 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
674 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
675 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
677
678 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
680 let data =
681 |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
682 let transmission_id = TransmissionID::Solution(
683 rng.r#gen::<u64>().into(),
684 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
685 );
686 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
687 let transmission = Transmission::Solution(data(rng));
688
689 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
691 assert!(worker.contains_transmission(transmission_id));
692 assert!(worker.ready.read().contains(transmission_id));
693 assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
694 assert!(worker.ready.write().remove_front().is_some());
696 assert!(!worker.ready.read().contains(transmission_id));
697 }
698
699 #[tokio::test]
700 async fn test_send_transmission() {
701 let rng = &mut TestRng::default();
702 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
704 let committee_clone = committee.clone();
705 let mut gateway = MockGateway::default();
707 gateway.expect_send().returning(|_, _| {
708 let (_tx, rx) = oneshot::channel();
709 Some(rx)
710 });
711 let mut mock_ledger = MockLedger::default();
712 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
713 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
714 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
715 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
716 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
718
719 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
721 let transmission_id = TransmissionID::Solution(
722 rng.r#gen::<u64>().into(),
723 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
724 );
725 let worker_ = worker.clone();
726 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
727 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
728 assert!(worker.pending.contains(transmission_id));
729 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
730 worker.finish_transmission_request(peer_ip, TransmissionResponse {
732 transmission_id,
733 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
734 });
735 assert!(!worker.pending.contains(transmission_id));
737 }
738
739 #[tokio::test]
740 async fn test_process_solution_ok() {
741 let rng = &mut TestRng::default();
742 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
744 let committee_clone = committee.clone();
745 let mut gateway = MockGateway::default();
747 gateway.expect_send().returning(|_, _| {
748 let (_tx, rx) = oneshot::channel();
749 Some(rx)
750 });
751 let mut mock_ledger = MockLedger::default();
752 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
753 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
754 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
755 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
756 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
757 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
759
760 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
762 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
763 let solution_id = rng.r#gen::<u64>().into();
764 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
765 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
766 let worker_ = worker.clone();
767 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
768 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
769 assert!(worker.pending.contains(transmission_id));
770 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
771 assert!(result.is_ok());
772 assert!(!worker.pending.contains(transmission_id));
773 assert!(worker.ready.read().contains(transmission_id));
774 }
775
776 #[tokio::test]
777 async fn test_process_solution_nok() {
778 let rng = &mut TestRng::default();
779 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
781 let committee_clone = committee.clone();
782 let mut gateway = MockGateway::default();
784 gateway.expect_send().returning(|_, _| {
785 let (_tx, rx) = oneshot::channel();
786 Some(rx)
787 });
788 let mut mock_ledger = MockLedger::default();
789 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
790 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
791 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
792 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
793 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
794 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
796
797 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
799 let solution_id = rng.r#gen::<u64>().into();
800 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
801 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
802 let transmission_id = TransmissionID::Solution(solution_id, checksum);
803 let worker_ = worker.clone();
804 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
805 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
806 assert!(worker.pending.contains(transmission_id));
807 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
808 assert!(result.is_err());
809 assert!(!worker.pending.contains(transmission_id));
810 assert!(!worker.ready.read().contains(transmission_id));
811 }
812
813 #[tokio::test]
814 async fn test_process_transaction_ok() {
815 let rng = &mut TestRng::default();
816 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
818 let committee_clone = committee.clone();
819 let mut gateway = MockGateway::default();
821 gateway.expect_send().returning(|_, _| {
822 let (_tx, rx) = oneshot::channel();
823 Some(rx)
824 });
825 let mut mock_ledger = MockLedger::default();
826 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
827 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
828 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
829 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
830 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
831 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
833
834 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
836 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
837 let transaction_id = transaction.id();
838 let transaction_data = Data::Object(transaction);
839 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
840 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
841 let worker_ = worker.clone();
842 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
843 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
844 assert!(worker.pending.contains(transmission_id));
845 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
846 assert!(result.is_ok());
847 assert!(!worker.pending.contains(transmission_id));
848 assert!(worker.ready.read().contains(transmission_id));
849 }
850
851 #[tokio::test]
852 async fn test_process_transaction_nok() {
853 let mut rng = &mut TestRng::default();
854 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
856 let committee_clone = committee.clone();
857 let mut gateway = MockGateway::default();
859 gateway.expect_send().returning(|_, _| {
860 let (_tx, rx) = oneshot::channel();
861 Some(rx)
862 });
863 let mut mock_ledger = MockLedger::default();
864 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
865 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
866 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
867 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
868 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
869 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
871
872 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
874 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
875 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
876 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
877 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
878 let worker_ = worker.clone();
879 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
880 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
881 assert!(worker.pending.contains(transmission_id));
882 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
883 assert!(result.is_err());
884 assert!(!worker.pending.contains(transmission_id));
885 assert!(!worker.ready.read().contains(transmission_id));
886 }
887
888 #[tokio::test]
889 async fn test_flood_transmission_requests() {
890 let rng = &mut TestRng::default();
891 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
893 let committee_clone = committee.clone();
894 let mut gateway = MockGateway::default();
896 gateway.expect_send().returning(|_, _| {
897 let (_tx, rx) = oneshot::channel();
898 Some(rx)
899 });
900 let mut mock_ledger = MockLedger::default();
901 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
902 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
903 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
904 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
905 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
906 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
908
909 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
911 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
912 let transaction_id = transaction.id();
913 let transaction_data = Data::Object(transaction);
914 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
915 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
916
917 let num_redundant_requests =
919 max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
920 let num_flood_requests = num_redundant_requests * 10;
921 let mut peer_ips =
922 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
923 let first_peer_ip = peer_ips[0];
924
925 for i in 1..=num_flood_requests {
927 let worker_ = worker.clone();
928 let peer_ip = peer_ips.pop().unwrap();
929 tokio::spawn(async move {
930 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
931 });
932 tokio::time::sleep(Duration::from_millis(10)).await;
933 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
935 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
936 }
937 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
939 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
940
941 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
943 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
944 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
945
946 for i in 1..=num_flood_requests {
948 let worker_ = worker.clone();
949 tokio::spawn(async move {
950 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
951 });
952 tokio::time::sleep(Duration::from_millis(10)).await;
953 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
954 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
955 }
956 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
958 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
959
960 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
962 assert!(result.is_ok());
963 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
964 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
965 assert!(!worker.pending.contains(transmission_id));
966 assert!(worker.ready.read().contains(transmission_id));
967 }
968
969 #[tokio::test]
970 async fn test_storage_gc_on_initialization() {
971 let rng = &mut TestRng::default();
972
973 for _ in 0..ITERATIONS {
974 let max_gc_rounds = rng.gen_range(50..=100);
976 let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
977 let expected_gc_round = latest_ledger_round - max_gc_rounds;
978
979 let committee =
981 snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
982
983 let mut mock_ledger = MockLedger::default();
985 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
986
987 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
988 let storage =
990 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
991
992 assert_eq!(storage.gc_round(), expected_gc_round);
994 }
995 }
996}
997
998#[cfg(test)]
999mod prop_tests {
1000 use super::*;
1001 use crate::Gateway;
1002 use snarkos_node_bft_ledger_service::MockLedgerService;
1003 use snarkvm::{
1004 console::account::Address,
1005 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1006 };
1007
1008 use test_strategy::proptest;
1009
1010 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1011
1012 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1014 let mut members = IndexMap::with_capacity(n as usize);
1015 for i in 0..n {
1016 let rng = &mut TestRng::fixed(i as u64);
1018 let address = Address::new(rng.r#gen());
1019 info!("Validator {i}: {address}");
1020 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
1021 }
1022 Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1024 }
1025
1026 #[proptest]
1027 fn worker_initialization(
1028 #[strategy(0..MAX_WORKERS)] id: u8,
1029 gateway: Gateway<CurrentNetwork>,
1030 storage: Storage<CurrentNetwork>,
1031 ) {
1032 let committee = new_test_committee(4);
1033 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1034 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1035 assert_eq!(worker.id(), id);
1036 }
1037
1038 #[proptest]
1039 fn invalid_worker_id(
1040 #[strategy(MAX_WORKERS..)] id: u8,
1041 gateway: Gateway<CurrentNetwork>,
1042 storage: Storage<CurrentNetwork>,
1043 ) {
1044 let committee = new_test_committee(4);
1045 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1046 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1047 if let Err(error) = worker {
1049 assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1050 }
1051 }
1052}