1use crate::{
17 MAX_FETCH_TIMEOUT_IN_MS,
18 MAX_WORKERS,
19 ProposedBatch,
20 Transport,
21 events::{Event, TransmissionRequest, TransmissionResponse},
22 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
23 spawn_blocking,
24};
25use snarkos_node_bft_ledger_service::LedgerService;
26use snarkvm::{
27 console::prelude::*,
28 ledger::{
29 block::Transaction,
30 narwhal::{BatchHeader, Data, Transmission, TransmissionID},
31 puzzle::{Solution, SolutionID},
32 },
33};
34
35use colored::Colorize;
36use indexmap::{IndexMap, IndexSet};
37use parking_lot::Mutex;
38use rand::seq::IteratorRandom;
39use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
40use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
41
42#[derive(Clone)]
43pub struct Worker<N: Network> {
44 id: u8,
46 gateway: Arc<dyn Transport<N>>,
48 storage: Storage<N>,
50 ledger: Arc<dyn LedgerService<N>>,
52 proposed_batch: Arc<ProposedBatch<N>>,
54 ready: Ready<N>,
56 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
58 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
60}
61
62impl<N: Network> Worker<N> {
63 pub fn new(
65 id: u8,
66 gateway: Arc<dyn Transport<N>>,
67 storage: Storage<N>,
68 ledger: Arc<dyn LedgerService<N>>,
69 proposed_batch: Arc<ProposedBatch<N>>,
70 ) -> Result<Self> {
71 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
73 Ok(Self {
75 id,
76 gateway,
77 storage,
78 ledger,
79 proposed_batch,
80 ready: Default::default(),
81 pending: Default::default(),
82 handles: Default::default(),
83 })
84 }
85
86 pub fn run(&self, receiver: WorkerReceiver<N>) {
88 info!("Starting worker instance {} of the memory pool...", self.id);
89 self.start_handlers(receiver);
91 }
92
93 pub const fn id(&self) -> u8 {
95 self.id
96 }
97
98 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
100 &self.pending
101 }
102}
103
104impl<N: Network> Worker<N> {
105 pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
107 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
108 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
110
111 pub fn num_transmissions(&self) -> usize {
115 self.ready.num_transmissions()
116 }
117
118 pub fn num_ratifications(&self) -> usize {
120 self.ready.num_ratifications()
121 }
122
123 pub fn num_solutions(&self) -> usize {
125 self.ready.num_solutions()
126 }
127
128 pub fn num_transactions(&self) -> usize {
130 self.ready.num_transactions()
131 }
132}
133
134impl<N: Network> Worker<N> {
135 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
137 self.ready.transmission_ids()
138 }
139
140 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
142 self.ready.transmissions()
143 }
144
145 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
147 self.ready.solutions()
148 }
149
150 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
152 self.ready.transactions()
153 }
154}
155
156impl<N: Network> Worker<N> {
157 pub(super) fn clear_solutions(&self) {
159 self.ready.clear_solutions()
160 }
161}
162
163impl<N: Network> Worker<N> {
164 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
166 let transmission_id = transmission_id.into();
167 self.ready.contains(transmission_id)
169 || self.proposed_batch.read().as_ref().map_or(false, |p| p.contains_transmission(transmission_id))
170 || self.storage.contains_transmission(transmission_id)
171 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
172 }
173
174 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
179 if let Some(transmission) = self.ready.get(transmission_id) {
181 return Some(transmission);
182 }
183 if let Some(transmission) = self.storage.get_transmission(transmission_id) {
185 return Some(transmission);
186 }
187 if let Some(transmission) =
189 self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
190 {
191 return Some(transmission.clone());
192 }
193 None
194 }
195
196 pub async fn get_or_fetch_transmission(
198 &self,
199 peer_ip: SocketAddr,
200 transmission_id: TransmissionID<N>,
201 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
202 if let Some(transmission) = self.get_transmission(transmission_id) {
204 return Ok((transmission_id, transmission));
205 }
206 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
208 ensure!(candidate_id == transmission_id, "Invalid transmission ID");
210 Ok((transmission_id, transmission))
212 }
213
214 pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
216 self.ready.drain(num_transmissions).into_iter()
217 }
218
219 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
221 if !self.contains_transmission(transmission_id) {
223 return self.ready.insert(transmission_id, transmission);
225 }
226 false
227 }
228
229 pub(crate) fn broadcast_ping(&self) {
231 let transmission_ids = self
233 .ready
234 .transmission_ids()
235 .into_iter()
236 .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
237 .into_iter()
238 .collect::<IndexSet<_>>();
239
240 if !transmission_ids.is_empty() {
242 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
243 }
244 }
245}
246
247impl<N: Network> Worker<N> {
248 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
250 if self.contains_transmission(transmission_id) {
252 return;
253 }
254 if self.ready.num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
257 return;
258 }
259 let self_ = self.clone();
261 tokio::spawn(async move {
262 match self_.send_transmission_request(peer_ip, transmission_id).await {
264 Ok((candidate_id, transmission)) => {
266 if candidate_id == transmission_id {
268 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
272 }
273 }
274 Err(e) => {
276 warn!(
277 "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
278 self_.id,
279 fmt_id(transmission_id),
280 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
281 );
282 }
283 }
284 });
285 }
286
287 pub(crate) fn process_transmission_from_peer(
289 &self,
290 peer_ip: SocketAddr,
291 transmission_id: TransmissionID<N>,
292 transmission: Transmission<N>,
293 ) {
294 if self.contains_transmission(transmission_id) {
296 return;
297 }
298 let is_well_formed = match (&transmission_id, &transmission) {
300 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
301 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
302 (TransmissionID::Ratification, Transmission::Ratification) => false,
305 _ => false,
307 };
308 if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(tx)) = (transmission_id, &transmission)
311 {
312 if let Data::Object(Transaction::Execute(..)) = tx {
313 let self_ = self.clone();
314 let tx_ = tx.clone();
315 tokio::spawn(async move {
316 let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
317 });
318 }
319 }
320 if is_well_formed && self.ready.insert(transmission_id, transmission) {
322 trace!(
323 "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
324 self.id,
325 fmt_id(transmission_id),
326 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
327 );
328 }
329 }
330
331 pub(crate) async fn process_unconfirmed_solution(
334 &self,
335 solution_id: SolutionID<N>,
336 solution: Data<Solution<N>>,
337 ) -> Result<()> {
338 let transmission = Transmission::Solution(solution.clone());
340 let checksum = solution.to_checksum::<N>()?;
342 let transmission_id = TransmissionID::Solution(solution_id, checksum);
344 self.pending.remove(transmission_id, Some(transmission.clone()));
346 if self.contains_transmission(transmission_id) {
348 bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
349 }
350 self.ledger.check_solution_basic(solution_id, solution).await?;
352 if self.ready.insert(transmission_id, transmission) {
354 trace!(
355 "Worker {} - Added unconfirmed solution '{}.{}'",
356 self.id,
357 fmt_id(solution_id),
358 fmt_id(checksum).dimmed()
359 );
360 }
361 Ok(())
362 }
363
364 pub(crate) async fn process_unconfirmed_transaction(
366 &self,
367 transaction_id: N::TransactionID,
368 transaction: Data<Transaction<N>>,
369 ) -> Result<()> {
370 let transmission = Transmission::Transaction(transaction.clone());
372 let checksum = transaction.to_checksum::<N>()?;
374 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
376 self.pending.remove(transmission_id, Some(transmission.clone()));
378 if self.contains_transmission(transmission_id) {
380 bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
381 }
382 self.ledger.check_transaction_basic(transaction_id, transaction).await?;
384 if self.ready.insert(transmission_id, transmission) {
386 trace!(
387 "Worker {}.{} - Added unconfirmed transaction '{}'",
388 self.id,
389 fmt_id(transaction_id),
390 fmt_id(checksum).dimmed()
391 );
392 }
393 Ok(())
394 }
395}
396
397impl<N: Network> Worker<N> {
398 fn start_handlers(&self, receiver: WorkerReceiver<N>) {
400 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
401
402 let self_ = self.clone();
404 self.spawn(async move {
405 loop {
406 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
408
409 let self__ = self_.clone();
411 let _ = spawn_blocking!({
412 self__.pending.clear_expired_callbacks();
413 Ok(())
414 });
415 }
416 });
417
418 let self_ = self.clone();
420 self.spawn(async move {
421 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
422 self_.process_transmission_id_from_ping(peer_ip, transmission_id);
423 }
424 });
425
426 let self_ = self.clone();
428 self.spawn(async move {
429 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
430 self_.send_transmission_response(peer_ip, transmission_request);
431 }
432 });
433
434 let self_ = self.clone();
436 self.spawn(async move {
437 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
438 let self__ = self_.clone();
440 let _ = spawn_blocking!({
441 self__.finish_transmission_request(peer_ip, transmission_response);
442 Ok(())
443 });
444 }
445 });
446 }
447
448 async fn send_transmission_request(
450 &self,
451 peer_ip: SocketAddr,
452 transmission_id: TransmissionID<N>,
453 ) -> Result<(TransmissionID<N>, Transmission<N>)> {
454 let (callback_sender, callback_receiver) = oneshot::channel();
456 let num_sent_requests = self.pending.num_sent_requests(transmission_id);
458 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
460 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
462 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
465
466 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
468
469 if should_send_request {
471 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
473 bail!("Unable to fetch transmission - failed to send request")
474 }
475 } else {
476 debug!(
477 "Skipped sending request for transmission {}.{} to '{peer_ip}' ({num_sent_requests} redundant requests)",
478 fmt_id(transmission_id),
479 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
480 );
481 }
482 match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
484 Ok(result) => Ok((transmission_id, result?)),
486 Err(e) => bail!("Unable to fetch transmission - (timeout) {e}"),
488 }
489 }
490
491 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
494 let TransmissionResponse { transmission_id, mut transmission } = response;
495 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
497 if exists {
499 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
501 Ok(()) => {
502 self.pending.remove(transmission_id, Some(transmission));
504 }
505 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
506 };
507 }
508 }
509
510 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
512 let TransmissionRequest { transmission_id } = request;
513 if let Some(transmission) = self.get_transmission(transmission_id) {
515 let self_ = self.clone();
517 tokio::spawn(async move {
518 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
519 });
520 }
521 }
522
523 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
525 self.handles.lock().push(tokio::spawn(future));
526 }
527
528 pub(crate) fn shut_down(&self) {
530 trace!("Shutting down worker {}...", self.id);
531 self.handles.lock().iter().for_each(|handle| handle.abort());
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
540 use snarkos_node_bft_ledger_service::LedgerService;
541 use snarkos_node_bft_storage_service::BFTMemoryService;
542 use snarkvm::{
543 console::{network::Network, types::Field},
544 ledger::{
545 block::Block,
546 committee::Committee,
547 narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
548 },
549 prelude::Address,
550 };
551
552 use bytes::Bytes;
553 use indexmap::IndexMap;
554 use mockall::mock;
555 use std::{io, ops::Range};
556
557 type CurrentNetwork = snarkvm::prelude::MainnetV0;
558
559 const ITERATIONS: usize = 100;
560
561 mock! {
562 Gateway<N: Network> {}
563 #[async_trait]
564 impl<N:Network> Transport<N> for Gateway<N> {
565 fn broadcast(&self, event: Event<N>);
566 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
567 }
568 }
569
570 mock! {
571 #[derive(Debug)]
572 Ledger<N: Network> {}
573 #[async_trait]
574 impl<N: Network> LedgerService<N> for Ledger<N> {
575 fn latest_round(&self) -> u64;
576 fn latest_block_height(&self) -> u32;
577 fn latest_block(&self) -> Block<N>;
578 fn latest_restrictions_id(&self) -> Field<N>;
579 fn latest_leader(&self) -> Option<(u64, Address<N>)>;
580 fn update_latest_leader(&self, round: u64, leader: Address<N>);
581 fn contains_block_height(&self, height: u32) -> bool;
582 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
583 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
584 fn get_block_round(&self, height: u32) -> Result<u64>;
585 fn get_block(&self, height: u32) -> Result<Block<N>>;
586 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
587 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
588 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
589 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
590 fn current_committee(&self) -> Result<Committee<N>>;
591 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
592 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
593 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
594 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
595 fn ensure_transmission_is_well_formed(
596 &self,
597 transmission_id: TransmissionID<N>,
598 transmission: &mut Transmission<N>,
599 ) -> Result<()>;
600 async fn check_solution_basic(
601 &self,
602 solution_id: SolutionID<N>,
603 solution: Data<Solution<N>>,
604 ) -> Result<()>;
605 async fn check_transaction_basic(
606 &self,
607 transaction_id: N::TransactionID,
608 transaction: Data<Transaction<N>>,
609 ) -> Result<()>;
610 fn check_next_block(&self, block: &Block<N>) -> Result<()>;
611 fn prepare_advance_to_next_quorum_block(
612 &self,
613 subdag: Subdag<N>,
614 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
615 ) -> Result<Block<N>>;
616 fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
617 }
618 }
619
620 #[tokio::test]
621 async fn test_max_redundant_requests() {
622 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
623
624 let rng = &mut TestRng::default();
625 let committee =
627 snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
628 let committee_clone = committee.clone();
629 let mut mock_ledger = MockLedger::default();
631 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
632 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
633 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
634 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
635 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
636
637 assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
639 }
640
641 #[tokio::test]
642 async fn test_process_transmission() {
643 let rng = &mut TestRng::default();
644 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
646 let committee_clone = committee.clone();
647 let gateway = MockGateway::default();
649 let mut mock_ledger = MockLedger::default();
650 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
651 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
652 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
653 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
654 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
655 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
657
658 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
660 let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
661 let transmission_id = TransmissionID::Solution(
662 rng.gen::<u64>().into(),
663 rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
664 );
665 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
666 let transmission = Transmission::Solution(data(rng));
667
668 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
670 assert!(worker.contains_transmission(transmission_id));
671 assert!(worker.ready.contains(transmission_id));
672 assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
673 let transmission: Vec<_> = worker.drain(1).collect();
675 assert_eq!(transmission.len(), 1);
676 assert!(!worker.ready.contains(transmission_id));
677 }
678
679 #[tokio::test]
680 async fn test_send_transmission() {
681 let rng = &mut TestRng::default();
682 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
684 let committee_clone = committee.clone();
685 let mut gateway = MockGateway::default();
687 gateway.expect_send().returning(|_, _| {
688 let (_tx, rx) = oneshot::channel();
689 Some(rx)
690 });
691 let mut mock_ledger = MockLedger::default();
692 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
693 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
694 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
695 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
696 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
698
699 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
701 let transmission_id = TransmissionID::Solution(
702 rng.gen::<u64>().into(),
703 rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
704 );
705 let worker_ = worker.clone();
706 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
707 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
708 assert!(worker.pending.contains(transmission_id));
709 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
710 worker.finish_transmission_request(peer_ip, TransmissionResponse {
712 transmission_id,
713 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
714 });
715 assert!(!worker.pending.contains(transmission_id));
717 }
718
719 #[tokio::test]
720 async fn test_process_solution_ok() {
721 let rng = &mut TestRng::default();
722 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
724 let committee_clone = committee.clone();
725 let mut gateway = MockGateway::default();
727 gateway.expect_send().returning(|_, _| {
728 let (_tx, rx) = oneshot::channel();
729 Some(rx)
730 });
731 let mut mock_ledger = MockLedger::default();
732 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
733 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
734 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
735 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
736 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
737 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
739
740 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
742 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
743 let solution_id = rng.gen::<u64>().into();
744 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
745 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
746 let worker_ = worker.clone();
747 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
748 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
749 assert!(worker.pending.contains(transmission_id));
750 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
751 assert!(result.is_ok());
752 assert!(!worker.pending.contains(transmission_id));
753 assert!(worker.ready.contains(transmission_id));
754 }
755
756 #[tokio::test]
757 async fn test_process_solution_nok() {
758 let rng = &mut TestRng::default();
759 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
761 let committee_clone = committee.clone();
762 let mut gateway = MockGateway::default();
764 gateway.expect_send().returning(|_, _| {
765 let (_tx, rx) = oneshot::channel();
766 Some(rx)
767 });
768 let mut mock_ledger = MockLedger::default();
769 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
770 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
771 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
772 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
773 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
774 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
776
777 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
779 let solution_id = rng.gen::<u64>().into();
780 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
781 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
782 let transmission_id = TransmissionID::Solution(solution_id, checksum);
783 let worker_ = worker.clone();
784 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
785 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
786 assert!(worker.pending.contains(transmission_id));
787 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
788 assert!(result.is_err());
789 assert!(!worker.pending.contains(transmission_id));
790 assert!(!worker.ready.contains(transmission_id));
791 }
792
793 #[tokio::test]
794 async fn test_process_transaction_ok() {
795 let mut rng = &mut TestRng::default();
796 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
798 let committee_clone = committee.clone();
799 let mut gateway = MockGateway::default();
801 gateway.expect_send().returning(|_, _| {
802 let (_tx, rx) = oneshot::channel();
803 Some(rx)
804 });
805 let mut mock_ledger = MockLedger::default();
806 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
807 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
808 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
809 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
810 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
811 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
813
814 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
816 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
817 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
818 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
819 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
820 let worker_ = worker.clone();
821 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
822 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
823 assert!(worker.pending.contains(transmission_id));
824 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
825 assert!(result.is_ok());
826 assert!(!worker.pending.contains(transmission_id));
827 assert!(worker.ready.contains(transmission_id));
828 }
829
830 #[tokio::test]
831 async fn test_process_transaction_nok() {
832 let mut rng = &mut TestRng::default();
833 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
835 let committee_clone = committee.clone();
836 let mut gateway = MockGateway::default();
838 gateway.expect_send().returning(|_, _| {
839 let (_tx, rx) = oneshot::channel();
840 Some(rx)
841 });
842 let mut mock_ledger = MockLedger::default();
843 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
844 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
845 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
846 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
847 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
848 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
850
851 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
853 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
854 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
855 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
856 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
857 let worker_ = worker.clone();
858 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
859 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
860 assert!(worker.pending.contains(transmission_id));
861 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
862 assert!(result.is_err());
863 assert!(!worker.pending.contains(transmission_id));
864 assert!(!worker.ready.contains(transmission_id));
865 }
866
867 #[tokio::test]
868 async fn test_flood_transmission_requests() {
869 let mut rng = &mut TestRng::default();
870 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
872 let committee_clone = committee.clone();
873 let mut gateway = MockGateway::default();
875 gateway.expect_send().returning(|_, _| {
876 let (_tx, rx) = oneshot::channel();
877 Some(rx)
878 });
879 let mut mock_ledger = MockLedger::default();
880 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
881 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
882 mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
883 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
884 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
885 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
887
888 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
890 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
891 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
892 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
893 let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
894
895 let num_redundant_requests =
897 max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
898 let num_flood_requests = num_redundant_requests * 10;
899 let mut peer_ips =
900 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
901 let first_peer_ip = peer_ips[0];
902
903 for i in 1..=num_flood_requests {
905 let worker_ = worker.clone();
906 let peer_ip = peer_ips.pop().unwrap();
907 tokio::spawn(async move {
908 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
909 });
910 tokio::time::sleep(Duration::from_millis(10)).await;
911 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
913 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
914 }
915 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
917 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
918
919 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
921 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
922 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
923
924 for i in 1..=num_flood_requests {
926 let worker_ = worker.clone();
927 tokio::spawn(async move {
928 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
929 });
930 tokio::time::sleep(Duration::from_millis(10)).await;
931 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
932 assert_eq!(worker.pending.num_callbacks(transmission_id), i);
933 }
934 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
936 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
937
938 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
940 assert!(result.is_ok());
941 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
942 assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
943 assert!(!worker.pending.contains(transmission_id));
944 assert!(worker.ready.contains(transmission_id));
945 }
946
947 #[tokio::test]
948 async fn test_storage_gc_on_initialization() {
949 let rng = &mut TestRng::default();
950
951 for _ in 0..ITERATIONS {
952 let max_gc_rounds = rng.gen_range(50..=100);
954 let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
955 let expected_gc_round = latest_ledger_round - max_gc_rounds;
956
957 let committee =
959 snarkvm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
960
961 let mut mock_ledger = MockLedger::default();
963 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
964
965 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
966 let storage =
968 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
969
970 assert_eq!(storage.gc_round(), expected_gc_round);
972 }
973 }
974}
975
976#[cfg(test)]
977mod prop_tests {
978 use super::*;
979 use crate::Gateway;
980 use snarkos_node_bft_ledger_service::MockLedgerService;
981 use snarkvm::{
982 console::account::Address,
983 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
984 };
985
986 use test_strategy::proptest;
987
988 type CurrentNetwork = snarkvm::prelude::MainnetV0;
989
990 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
992 let mut members = IndexMap::with_capacity(n as usize);
993 for i in 0..n {
994 let rng = &mut TestRng::fixed(i as u64);
996 let address = Address::new(rng.gen());
997 info!("Validator {i}: {address}");
998 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
999 }
1000 Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1002 }
1003
1004 #[proptest]
1005 fn worker_initialization(
1006 #[strategy(0..MAX_WORKERS)] id: u8,
1007 gateway: Gateway<CurrentNetwork>,
1008 storage: Storage<CurrentNetwork>,
1009 ) {
1010 let committee = new_test_committee(4);
1011 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1012 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1013 assert_eq!(worker.id(), id);
1014 }
1015
1016 #[proptest]
1017 fn invalid_worker_id(
1018 #[strategy(MAX_WORKERS..)] id: u8,
1019 gateway: Gateway<CurrentNetwork>,
1020 storage: Storage<CurrentNetwork>,
1021 ) {
1022 let committee = new_test_committee(4);
1023 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1024 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1025 if let Err(error) = worker {
1027 assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
1028 }
1029 }
1030}