1use crate::{
17 MAX_FETCH_TIMEOUT_IN_MS,
18 MAX_WORKERS,
19 ProposedBatch,
20 Transport,
21 events::{Event, TransmissionRequest, TransmissionResponse},
22 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27 console::prelude::*,
28 ledger::{
29 block::Transaction,
30 narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31 puzzle::{Solution, SolutionID},
32 },
33};
34
35use colored::Colorize;
36use indexmap::{IndexMap, IndexSet};
37use parking_lot::Mutex;
38use rand::seq::IteratorRandom;
39use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
40use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
41
42#[derive(Clone)]
43pub struct Worker<N: Network> {
44 id: u8,
46 gateway: Arc<dyn Transport<N>>,
48 storage: Storage<N>,
50 ledger: Arc<dyn LedgerService<N>>,
52 proposed_batch: Arc<ProposedBatch<N>>,
54 ready: Ready<N>,
56 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
58 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
60}
61
62impl<N: Network> Worker<N> {
63 pub fn new(
65 id: u8,
66 gateway: Arc<dyn Transport<N>>,
67 storage: Storage<N>,
68 ledger: Arc<dyn LedgerService<N>>,
69 proposed_batch: Arc<ProposedBatch<N>>,
70 ) -> Result<Self> {
71 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
73 Ok(Self {
75 id,
76 gateway,
77 storage,
78 ledger,
79 proposed_batch,
80 ready: Default::default(),
81 pending: Default::default(),
82 handles: Default::default(),
83 })
84 }
85
86 pub fn run(&self, receiver: WorkerReceiver<N>) {
88 info!("Starting worker instance {} of the memory pool...", self.id);
89 self.start_handlers(receiver);
91 }
92
93 pub const fn id(&self) -> u8 {
95 self.id
96 }
97
98 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
100 &self.pending
101 }
102}
103
104impl<N: Network> Worker<N> {
105 pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
107 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
108 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
110
111 pub fn num_transmissions(&self) -> usize {
115 self.ready.num_transmissions()
116 }
117
118 pub fn num_ratifications(&self) -> usize {
120 self.ready.num_ratifications()
121 }
122
123 pub fn num_solutions(&self) -> usize {
125 self.ready.num_solutions()
126 }
127
128 pub fn num_transactions(&self) -> usize {
130 self.ready.num_transactions()
131 }
132}
133
134impl<N: Network> Worker<N> {
135 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
137 self.ready.transmission_ids()
138 }
139
140 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
142 self.ready.transmissions()
143 }
144
145 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
147 self.ready.solutions()
148 }
149
150 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
152 self.ready.transactions()
153 }
154}
155
156impl<N: Network> Worker<N> {
157 pub(super) fn clear_solutions(&self) {
159 self.ready.clear_solutions()
160 }
161}
162
163impl<N: Network> Worker<N> {
164 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
166 let transmission_id = transmission_id.into();
167 self.ready.contains(transmission_id)
169 || self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
170 || self.storage.contains_transmission(transmission_id)
171 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
172 }
173
174 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
179 if let Some(transmission) = self.ready.get(transmission_id) {
181 return Some(transmission);
182 }
183 if let Some(transmission) = self.storage.get_transmission(transmission_id) {
185 return Some(transmission);
186 }
187 if let Some(transmission) =
189 self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
190 {
191 return Some(transmission.clone());
192 }
193 None
194 }
195
196 pub async fn get_or_fetch_transmission(
198 &self,
199 peer_ip: SocketAddr,
200 transmission_id: TransmissionID<N>,
201 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
202 if let Some(transmission) = self.get_transmission(transmission_id) {
204 return Ok((transmission_id, transmission));
205 }
206 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
208 ensure!(candidate_id == transmission_id, "Invalid transmission ID");
210 Ok((transmission_id, transmission))
212 }
213
214 pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
216 self.ready.drain(num_transmissions).into_iter()
217 }
218
219 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
221 if !self.contains_transmission(transmission_id) {
223 return self.ready.insert(transmission_id, transmission);
225 }
226 false
227 }
228
229 pub(crate) fn broadcast_ping(&self) {
231 let transmission_ids = self
233 .ready
234 .transmission_ids()
235 .into_iter()
236 .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
237 .into_iter()
238 .collect::<IndexSet<_>>();
239
240 if !transmission_ids.is_empty() {
242 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
243 }
244 }
245}
246
247impl<N: Network> Worker<N> {
248 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
250 if self.contains_transmission(transmission_id) {
252 return;
253 }
254 if self.ready.num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
257 return;
258 }
259 let self_ = self.clone();
261 tokio::spawn(async move {
262 match self_.send_transmission_request(peer_ip, transmission_id).await {
264 Ok((candidate_id, transmission)) => {
266 if candidate_id == transmission_id {
268 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
272 }
273 }
274 Err(e) => {
276 warn!(
277 "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
278 self_.id,
279 fmt_id(transmission_id),
280 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
281 );
282 }
283 }
284 });
285 }
286
287 pub(crate) fn process_transmission_from_peer(
289 &self,
290 peer_ip: SocketAddr,
291 transmission_id: TransmissionID<N>,
292 transmission: Transmission<N>,
293 ) {
294 if self.contains_transmission(transmission_id) {
296 return;
297 }
298 let is_well_formed = match (&transmission_id, &transmission) {
300 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
301 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
302 (TransmissionID::Ratification, Transmission::Ratification) => false,
305 _ => false,
307 };
308 if is_well_formed && self.ready.insert(transmission_id, transmission) {
310 trace!(
311 "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
312 self.id,
313 fmt_id(transmission_id),
314 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
315 );
316 }
317 }
318
319 pub(crate) async fn process_unconfirmed_solution(
322 &self,
323 solution_id: SolutionID<N>,
324 solution: Data<Solution<N>>,
325 ) -> Result<()> {
326 let transmission = Transmission::Solution(solution.clone());
328 let checksum = solution.to_checksum::<N>()?;
330 let transmission_id = TransmissionID::Solution(solution_id, checksum);
332 self.pending.remove(transmission_id, Some(transmission.clone()));
334 if self.contains_transmission(transmission_id) {
336 bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
337 }
338 self.ledger.check_solution_basic(solution_id, solution).await?;
340 if self.ready.insert(transmission_id, transmission) {
342 trace!(
343 "Worker {} - Added unconfirmed solution '{}.{}'",
344 self.id,
345 fmt_id(solution_id),
346 fmt_id(checksum).dimmed()
347 );
348 }
349 Ok(())
350 }
351
352 pub(crate) async fn process_unconfirmed_transaction(
354 &self,
355 transaction_id: N::TransactionID,
356 transaction: Data<Transaction<N>>,
357 ) -> Result<()> {
358 let transmission = Transmission::Transaction(transaction.clone());
360 let checksum = transaction.to_checksum::<N>()?;
362 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
364 self.pending.remove(transmission_id, Some(transmission.clone()));
366 if self.contains_transmission(transmission_id) {
368 bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
369 }
370 self.ledger.check_transaction_basic(transaction_id, transaction).await?;
372 if self.ready.insert(transmission_id, transmission) {
374 trace!(
375 "Worker {}.{} - Added unconfirmed transaction '{}'",
376 self.id,
377 fmt_id(transaction_id),
378 fmt_id(checksum).dimmed()
379 );
380 }
381 Ok(())
382 }
383}
384
385impl<N: Network> Worker<N> {
386 fn start_handlers(&self, receiver: WorkerReceiver<N>) {
388 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
389
390 let self_ = self.clone();
392 self.spawn(async move {
393 loop {
394 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
396
397 let self__ = self_.clone();
399 let _ = spawn_blocking!({
400 self__.pending.clear_expired_callbacks();
401 Ok(())
402 });
403 }
404 });
405
406 let self_ = self.clone();
408 self.spawn(async move {
409 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
410 self_.process_transmission_id_from_ping(peer_ip, transmission_id);
411 }
412 });
413
414 let self_ = self.clone();
416 self.spawn(async move {
417 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
418 self_.send_transmission_response(peer_ip, transmission_request);
419 }
420 });
421
422 let self_ = self.clone();
424 self.spawn(async move {
425 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
426 let self__ = self_.clone();
428 let _ = spawn_blocking!({
429 self__.finish_transmission_request(peer_ip, transmission_response);
430 Ok(())
431 });
432 }
433 });
434 }
435
436 async fn send_transmission_request(
438 &self,
439 peer_ip: SocketAddr,
440 transmission_id: TransmissionID<N>,
441 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
442 let (callback_sender, callback_receiver) = oneshot::channel();
444 let num_sent_requests = self.pending.num_sent_requests(transmission_id);
446 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
448 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
450 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
453
454 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
456
457 if should_send_request {
459 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
461 bail!("Unable to fetch transmission - failed to send request")
462 }
463 } else {
464 debug!(
465 "Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
466 fmt_id(transmission_id),
467 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
468 );
469 }
470 match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
472 Ok(result) => Ok((transmission_id, result?)),
474 Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
476 }
477 }
478
479 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
482 let TransmissionResponse { transmission_id, mut transmission } = response;
483 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
485 if exists {
487 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
489 Ok(()) => {
490 self.pending.remove(transmission_id, Some(transmission));
492 }
493 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
494 };
495 }
496 }
497
498 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
500 let TransmissionRequest { transmission_id } = request;
501 if let Some(transmission) = self.get_transmission(transmission_id) {
503 let self_ = self.clone();
505 tokio::spawn(async move {
506 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
507 });
508 }
509 }
510
511 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
513 self.handles.lock().push(tokio::spawn(future));
514 }
515
516 pub(crate) fn shut_down(&self) {
518 trace!("Shutting down worker {}...", self.id);
519 self.handles.lock().iter().for_each(|handle| handle.abort());
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
528 use snarkos_node_bft_ledger_service::LedgerService;
529 use snarkos_node_bft_storage_service::BFTMemoryService;
530 use snarkvm::{
531 console::{network::Network, types::Field},
532 ledger::{
533 block::Block,
534 committee::Committee,
535 narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
536 },
537 prelude::Address,
538 };
539
540 use bytes::Bytes;
541 use indexmap::IndexMap;
542 use mockall::mock;
543 use std::{io, ops::Range};
544
545 type CurrentNetwork = snarkvm::prelude::MainnetV0;
546
547 const ITERATIONS: usize = 100;
548
549 mock! {
550 Gateway<N: Network> {}
551 #[async_trait]
552 impl<N:Network> Transport<N> for Gateway<N> {
553 fn broadcast(&self, event: Event<N>);
554 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
555 }
556 }
557
558 mock! {
559 #[derive(Debug)]
560 Ledger<N: Network> {}
561 #[async_trait]
562 impl<N: Network> LedgerService<N> for Ledger<N> {
563 fn latest_round(&self) -> u64;
564 fn latest_block_height(&self) -> u32;
565 fn latest_block(&self) -> Block<N>;
566 fn latest_restrictions_id(&self) -> Field<N>;
567 fn latest_leader(&self) -> Option<(u64, Address<N>)>;
568 fn update_latest_leader(&self, round: u64, leader: Address<N>);
569 fn contains_block_height(&self, height: u32) -> bool;
570 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
571 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
572 fn get_block_round(&self, height: u32) -> Result<u64>;
573 fn get_block(&self, height: u32) -> Result<Block<N>>;
574 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
575 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
576 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
577 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
578 fn current_committee(&self) -> Result<Committee<N>>;
579 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
580 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
581 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
582 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
583 fn ensure_transmission_is_well_formed(
584 &self,
585 transmission_id: TransmissionID<N>,
586 transmission: &mut Transmission<N>,
587 ) -> Result<()>;
588 async fn check_solution_basic(
589 &self,
590 solution_id: SolutionID<N>,
591 solution: Data<Solution<N>>,
592 ) -> Result<()>;
593 async fn check_transaction_basic(
594 &self,
595 transaction_id: N::TransactionID,
596 transaction: Data<Transaction<N>>,
597 ) -> Result<()>;
598 fn check_next_block(&self, block: &Block<N>) -> Result<()>;
599 fn prepare_advance_to_next_quorum_block(
600 &self,
601 subdag: Subdag<N>,
602 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
603 ) -> Result<Block<N>>;
604 fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
605 }
606 }
607
608 #[tokio::test]
609 async fn test_max_redundant_requests() {
610 const NUM_NODES: u16 = Committee::<CurrentNetwork>::MAX_COMMITTEE_SIZE;
611
612 let rng = &mut TestRng::default();
613 let committee =
615 snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, NUM_NODES, rng);
616 let committee_clone = committee.clone();
617 let mut mock_ledger = MockLedger::default();
619 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
620 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
621 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
622 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
623 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
624
625 assert_eq!(max_redundant_requests(ledger, 0), 34, "Update me if the formula changes");
627 }
628
629 #[tokio::test]
630 async fn test_process_transmission() {
631 let rng = &mut TestRng::default();
632 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
634 let committee_clone = committee.clone();
635 let gateway = MockGateway::default();
637 let mut mock_ledger = MockLedger::default();
638 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
639 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
640 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
641 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
642 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
643 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
645
646 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
648 let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
649 let transmission_id = TransmissionID::Solution(
650 rng.gen::<u64>().into(),
651 rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
652 );
653 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
654 let transmission = Transmission::Solution(data(rng));
655
656 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
658 assert!(worker.contains_transmission(transmission_id));
659 assert!(worker.ready.contains(transmission_id));
660 assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
661 let transmission: Vec<_> = worker.drain(1).collect();
663 assert_eq!(transmission.len(), 1);
664 assert!(!worker.ready.contains(transmission_id));
665 }
666
667 #[tokio::test]
668 async fn test_send_transmission() {
669 let rng = &mut TestRng::default();
670 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
672 let committee_clone = committee.clone();
673 let mut gateway = MockGateway::default();
675 gateway.expect_send().returning(|_, _| {
676 let (_tx, rx) = oneshot::channel();
677 Some(rx)
678 });
679 let mut mock_ledger = MockLedger::default();
680 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
681 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
682 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
683 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
684 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
686
687 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
689 let transmission_id = TransmissionID::Solution(
690 rng.gen::<u64>().into(),
691 rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
692 );
693 let worker_ = worker.clone();
694 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
695 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
696 assert!(worker.pending.contains(transmission_id));
697 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
698 worker.finish_transmission_request(peer_ip, TransmissionResponse {
700 transmission_id,
701 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
702 });
703 assert!(!worker.pending.contains(transmission_id));
705 }
706
707 #[tokio::test]
708 async fn test_process_solution_ok() {
709 let rng = &mut TestRng::default();
710 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
712 let committee_clone = committee.clone();
713 let mut gateway = MockGateway::default();
715 gateway.expect_send().returning(|_, _| {
716 let (_tx, rx) = oneshot::channel();
717 Some(rx)
718 });
719 let mut mock_ledger = MockLedger::default();
720 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
721 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
722 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
723 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
724 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
725 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
727
728 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
730 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
731 let solution_id = rng.gen::<u64>().into();
732 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
733 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
734 let worker_ = worker.clone();
735 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
736 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
737 assert!(worker.pending.contains(transmission_id));
738 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
739 assert!(result.is_ok());
740 assert!(!worker.pending.contains(transmission_id));
741 assert!(worker.ready.contains(transmission_id));
742 }
743
744 #[tokio::test]
745 async fn test_process_solution_nok() {
746 let rng = &mut TestRng::default();
747 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
749 let committee_clone = committee.clone();
750 let mut gateway = MockGateway::default();
752 gateway.expect_send().returning(|_, _| {
753 let (_tx, rx) = oneshot::channel();
754 Some(rx)
755 });
756 let mut mock_ledger = MockLedger::default();
757 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
758 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
759 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
760 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
761 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
762 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
764
765 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
767 let solution_id = rng.gen::<u64>().into();
768 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
769 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
770 let transmission_id = TransmissionID::Solution(solution_id, checksum);
771 let worker_ = worker.clone();
772 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
773 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
774 assert!(worker.pending.contains(transmission_id));
775 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
776 assert!(result.is_err());
777 assert!(!worker.pending.contains(transmission_id));
778 assert!(!worker.ready.contains(transmission_id));
779 }
780
781 #[tokio::test]
782 async fn test_process_transaction_ok() {
783 let mut rng = &mut TestRng::default();
784 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
786 let committee_clone = committee.clone();
787 let mut gateway = MockGateway::default();
789 gateway.expect_send().returning(|_, _| {
790 let (_tx, rx) = oneshot::channel();
791 Some(rx)
792 });
793 let mut mock_ledger = MockLedger::default();
794 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
795 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
796 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
797 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
798 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
799 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
801
802 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
804 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
805 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
806 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
807 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
808 let worker_ = worker.clone();
809 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
810 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
811 assert!(worker.pending.contains(transmission_id));
812 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
813 assert!(result.is_ok());
814 assert!(!worker.pending.contains(transmission_id));
815 assert!(worker.ready.contains(transmission_id));
816 }
817
818 #[tokio::test]
819 async fn test_process_transaction_nok() {
820 let mut rng = &mut TestRng::default();
821 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
823 let committee_clone = committee.clone();
824 let mut gateway = MockGateway::default();
826 gateway.expect_send().returning(|_, _| {
827 let (_tx, rx) = oneshot::channel();
828 Some(rx)
829 });
830 let mut mock_ledger = MockLedger::default();
831 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
832 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
833 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
834 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
835 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
836 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
838
839 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
841 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
842 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
843 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
844 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
845 let worker_ = worker.clone();
846 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
847 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
848 assert!(worker.pending.contains(transmission_id));
849 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
850 assert!(result.is_err());
851 assert!(!worker.pending.contains(transmission_id));
852 assert!(!worker.ready.contains(transmission_id));
853 }
854
855 #[tokio::test]
856 async fn test_flood_transmission_requests() {
857 let mut rng = &mut TestRng::default();
858 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
860 let committee_clone = committee.clone();
861 let mut gateway = MockGateway::default();
863 gateway.expect_send().returning(|_, _| {
864 let (_tx, rx) = oneshot::channel();
865 Some(rx)
866 });
867 let mut mock_ledger = MockLedger::default();
868 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
869 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
870 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
871 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
872 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
873 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
875
876 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
878 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
879 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
880 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
881 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
882
883 let num_redundant_requests = max_redundant_requests(worker.ledger.clone(), worker.storage.current_round());
885 let num_flood_requests = num_redundant_requests * 10;
886 let mut peer_ips =
887 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
888 let first_peer_ip = peer_ips[0];
889
890 for i in 1..=num_flood_requests {
892 let worker_ = worker.clone();
893 let peer_ip = peer_ips.pop().unwrap();
894 tokio::spawn(async move {
895 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
896 });
897 tokio::time::sleep(Duration::from_millis(10)).await;
898 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
900 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
901 }
902 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
904 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
905
906 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
908 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
909 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
910
911 for i in 1..=num_flood_requests {
913 let worker_ = worker.clone();
914 tokio::spawn(async move {
915 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
916 });
917 tokio::time::sleep(Duration::from_millis(10)).await;
918 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
919 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
920 }
921 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
923 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
924
925 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
927 assert!(result.is_ok());
928 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
929 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
930 assert!(!worker.pending.contains(transmission_id));
931 assert!(worker.ready.contains(transmission_id));
932 }
933
934 #[tokio::test]
935 async fn test_storage_gc_on_initialization() {
936 let rng = &mut TestRng::default();
937
938 for _ in 0..ITERATIONS {
939 let max_gc_rounds = rng.gen_range(50..=100);
941 let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
942 let expected_gc_round = latest_ledger_round - max_gc_rounds;
943
944 let committee =
946 snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
947
948 let mut mock_ledger = MockLedger::default();
950 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
951
952 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
953 let storage =
955 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
956
957 assert_eq!(storage.gc_round(), expected_gc_round);
959 }
960 }
961}
962
963#[cfg(test)]
964mod prop_tests {
965 use super::*;
966 use crate::Gateway;
967 use snarkos_node_bft_ledger_service::MockLedgerService;
968 use snarkvm::{
969 console::account::Address,
970 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
971 };
972
973 use test_strategy::proptest;
974
975 type CurrentNetwork = snarkvm::prelude::MainnetV0;
976
977 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
979 let mut members = IndexMap::with_capacity(n as usize);
980 for i in 0..n {
981 let rng = &mut TestRng::fixed(i as u64);
983 let address = Address::new(rng.gen());
984 info!("Validator {i}: {address}");
985 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
986 }
987 Committee::<CurrentNetwork>::new(1u64, members).unwrap()
989 }
990
991 #[proptest]
992 fn worker_initialization(
993 #[strategy(0..MAX_WORKERS)] id: u8,
994 gateway: Gateway<CurrentNetwork>,
995 storage: Storage<CurrentNetwork>,
996 ) {
997 let committee = new_test_committee(4);
998 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
999 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1000 assert_eq!(worker.id(), id);
1001 }
1002
1003 #[proptest]
1004 fn invalid_worker_id(
1005 #[strategy(MAX_WORKERS..)] id: u8,
1006 gateway: Gateway<CurrentNetwork>,
1007 storage: Storage<CurrentNetwork>,
1008 ) {
1009 let committee = new_test_committee(4);
1010 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1011 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1012 if let Err(error) = worker {
1014 assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
1015 }
1016 }
1017}