1#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29 BFT,
30 MAX_BATCH_DELAY,
31 Primary,
32 helpers::{
33 ConsensusReceiver,
34 PrimarySender,
35 Storage as NarwhalStorage,
36 fmt_id,
37 init_consensus_channels,
38 init_primary_channels,
39 },
40 spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45use snarkos_utilities::NodeDataDir;
46
47use snarkvm::{
48 ledger::{
49 CheckBlockError,
50 block::Transaction,
51 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
52 puzzle::{Solution, SolutionID},
53 },
54 prelude::*,
55 utilities::flatten_error,
56};
57
58use aleo_std::StorageMode;
59use anyhow::{Context, Result};
60use cfg_if::cfg_if;
61use colored::Colorize;
62use indexmap::IndexMap;
63#[cfg(feature = "locktick")]
64use locktick::parking_lot::{Mutex, RwLock};
65use lru::LruCache;
66#[cfg(not(feature = "locktick"))]
67use parking_lot::{Mutex, RwLock};
68use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc};
69use tokio::{
70 sync::{Notify, oneshot},
71 task::JoinHandle,
72};
73
74#[cfg(feature = "metrics")]
75use std::collections::HashMap;
76
77const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
80const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
83const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
86const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
89
90#[derive(Clone)]
97pub struct Consensus<N: Network> {
98 ledger: Arc<dyn LedgerService<N>>,
100 bft: BFT<N>,
102 primary_sender: PrimarySender<N>,
104 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
106 transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
108 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
110 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
112 #[cfg(feature = "metrics")]
113 transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
114 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
116 ping: Arc<Ping<N>>,
118 block_sync: Arc<BlockSync<N>>,
120 block_commit_notify: Arc<Notify>,
122}
123
124impl<N: Network> Consensus<N> {
125 #[allow(clippy::too_many_arguments)]
127 pub async fn new(
128 account: Account<N>,
129 ledger: Arc<dyn LedgerService<N>>,
130 block_sync: Arc<BlockSync<N>>,
131 ip: Option<SocketAddr>,
132 trusted_validators: &[SocketAddr],
133 trusted_peers_only: bool,
134 storage_mode: StorageMode,
135 node_data_dir: NodeDataDir,
136 ping: Arc<Ping<N>>,
137 dev: Option<u16>,
138 ) -> Result<Self> {
139 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
141 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
143 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64)
145 .with_context(|| "Failed to initialize the BFT storage")?;
146 let bft = BFT::new(
148 account,
149 storage,
150 ledger.clone(),
151 block_sync.clone(),
152 ip,
153 trusted_validators,
154 trusted_peers_only,
155 node_data_dir,
156 dev,
157 )?;
158 let mut _self = Self {
160 ledger,
161 bft,
162 block_sync,
163 primary_sender,
164 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
165 transactions_queue: Default::default(),
166 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
167 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
168 #[cfg(feature = "metrics")]
169 transmissions_tracker: Default::default(),
170 handles: Default::default(),
171 ping: ping.clone(),
172 block_commit_notify: Arc::new(Notify::new()),
173 };
174
175 info!("Starting the consensus instance...");
176
177 let (consensus_sender, consensus_receiver) = init_consensus_channels();
179 _self.start_handlers(consensus_receiver);
181 _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
183
184 Ok(_self)
185 }
186
187 pub const fn bft(&self) -> &BFT<N> {
189 &self.bft
190 }
191
192 pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
193 self.transactions_queue.read().contains(transaction_id)
194 }
195}
196
197impl<N: Network> Consensus<N> {
198 pub fn num_unconfirmed_transmissions(&self) -> usize {
200 self.bft.num_unconfirmed_transmissions()
201 }
202
203 pub fn num_unconfirmed_ratifications(&self) -> usize {
205 self.bft.num_unconfirmed_ratifications()
206 }
207
208 pub fn num_unconfirmed_solutions(&self) -> usize {
210 self.bft.num_unconfirmed_solutions()
211 }
212
213 pub fn num_unconfirmed_transactions(&self) -> usize {
215 self.bft.num_unconfirmed_transactions()
216 }
217}
218
219impl<N: Network> Consensus<N> {
220 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
222 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
223 }
224
225 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
227 self.worker_transmissions().chain(self.inbound_transmissions())
228 }
229
230 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
232 self.worker_solutions().chain(self.inbound_solutions())
233 }
234
235 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
237 self.worker_transactions().chain(self.inbound_transactions())
238 }
239}
240
241impl<N: Network> Consensus<N> {
242 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
244 self.bft.worker_transmission_ids()
245 }
246
247 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
249 self.bft.worker_transmissions()
250 }
251
252 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
254 self.bft.worker_solutions()
255 }
256
257 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
259 self.bft.worker_transactions()
260 }
261}
262
263impl<N: Network> Consensus<N> {
264 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
266 self.inbound_transmissions().map(|(id, _)| id)
267 }
268
269 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
271 self.inbound_transactions()
272 .map(|(id, tx)| {
273 (
274 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
275 Transmission::Transaction(tx),
276 )
277 })
278 .chain(self.inbound_solutions().map(|(id, solution)| {
279 (
280 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
281 Transmission::Solution(solution),
282 )
283 }))
284 }
285
286 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
288 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
290 }
291
292 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
294 self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
296 }
297}
298
299impl<N: Network> Consensus<N> {
300 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
303 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
305 {
307 let solution_id = solution.id();
308
309 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
311 return Ok(());
313 }
314 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
316 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
317 }
318 #[cfg(feature = "metrics")]
319 {
320 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
321 let timestamp = snarkos_node_bft::helpers::now();
322 self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
323 }
324 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
326 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
327 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
328 }
329 }
330
331 self.process_unconfirmed_solutions().await
333 }
334
335 async fn process_unconfirmed_solutions(&self) -> Result<()> {
338 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
340 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
341 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
342 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
343 {
344 return Ok(());
345 }
346 let solutions = {
348 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
350 let mut queue = self.solutions_queue.lock();
352 let num_solutions = queue.len().min(capacity);
354 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
356 };
357 for solution in solutions.into_iter() {
359 let solution_id = solution.id();
360 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
361 match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
363 Ok(true) => {}
364 Ok(false) => debug!(
365 "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
366 fmt_id(solution_id)
367 ),
368 Err(err) => {
369 let err = err.context(format!(
370 "Unable to add unconfirmed solution '{}' to the memory pool",
371 fmt_id(solution_id)
372 ));
373
374 if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
376 warn!("{}", flatten_error(err));
377 } else {
378 trace!("{}", flatten_error(err));
379 }
380 }
381 }
382 }
383 Ok(())
384 }
385
386 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
389 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
391 {
393 let transaction_id = transaction.id();
394
395 if transaction.is_fee() {
397 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
398 }
399 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
401 return Ok(());
403 }
404 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
406 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
407 }
408 if self.contains_transaction(&transaction_id) {
410 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
411 }
412 #[cfg(feature = "metrics")]
413 {
414 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
415 let timestamp = snarkos_node_bft::helpers::now();
416 self.transmissions_tracker
417 .lock()
418 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
419 }
420 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
422 let priority_fee = transaction.priority_fee_amount()?;
423 self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
424 }
425
426 self.process_unconfirmed_transactions().await
428 }
429
430 async fn process_unconfirmed_transactions(&self) -> Result<()> {
433 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
435 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
436 return Ok(());
437 }
438 let transactions = {
440 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
442 let mut tx_queue = self.transactions_queue.write();
444 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
446 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
448 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
451 selector_iter
453 .filter_map(
454 |select_deployment| {
455 if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
456 },
457 )
458 .map(|(_, tx)| tx)
459 .collect_vec()
460 };
461 for transaction in transactions.into_iter() {
463 let transaction_id = transaction.id();
464 let tx_type_str = match transaction {
466 Transaction::Deploy(..) => "deployment",
467 Transaction::Execute(..) => "execution",
468 Transaction::Fee(..) => "fee",
469 };
470 trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
471 match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
473 Ok(true) => {}
474 Ok(false) => debug!(
475 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
476 fmt_id(transaction_id)
477 ),
478 Err(err) => {
479 if self.bft.is_synced() {
481 let err = err.context(format!(
482 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
483 fmt_id(transaction_id)
484 ));
485 warn!("{}", flatten_error(err));
486 }
487 }
488 }
489 }
490 Ok(())
491 }
492}
493
494impl<N: Network> Consensus<N> {
495 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
499 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
500
501 let self_ = self.clone();
503 self.spawn(async move {
504 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
505 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
506 }
507 });
508
509 let self_ = self.clone();
517 self.spawn(async move {
518 loop {
519 tokio::select! {
521 _ = self_.block_commit_notify.notified() => {}
522 _ = tokio::time::sleep(MAX_BATCH_DELAY) => {}
523 }
524 if let Err(err) = self_.process_unconfirmed_transactions().await {
526 warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions")));
527 }
528 if let Err(err) = self_.process_unconfirmed_solutions().await {
530 warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions")));
531 }
532 }
533 });
534 }
535
536 async fn process_bft_subdag(
543 &self,
544 subdag: Subdag<N>,
545 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
546 callback: oneshot::Sender<Result<bool>>,
547 ) {
548 let self_ = self.clone();
550 let transmissions_ = transmissions.clone();
551 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };
552
553 match result {
555 Ok(true) => {
556 self.block_commit_notify.notify_one();
558 }
559 Ok(false) | Err(_) => self.reinsert_transmissions(transmissions).await,
560 }
561
562 callback.send(result).ok();
563 }
564
565 fn try_advance_to_next_block(
572 &self,
573 subdag: Subdag<N>,
574 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
575 ) -> Result<bool> {
576 #[cfg(feature = "metrics")]
577 let start = subdag.leader_certificate().batch_header().timestamp();
578 #[cfg(feature = "metrics")]
579 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
580 #[cfg(feature = "metrics")]
581 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
582
583 let ledger_update = self.ledger.begin_ledger_update()?;
585
586 let prepare_instant = std::time::Instant::now();
587 let block = match ledger_update.prepare_advance_to_next_quorum_block(subdag, transmissions) {
588 Ok(block) => block,
589 Err(err) => return Err(err.into_anyhow()),
590 };
591 let prepare_elapsed = prepare_instant.elapsed();
592 trace!("prepare_advance_to_next_quorum_block took {:.3}s", prepare_elapsed.as_secs_f64());
593 #[cfg(feature = "metrics")]
594 metrics::histogram(metrics::consensus::PREPARE_ADVANCE_SECS, prepare_elapsed.as_secs_f64());
595
596 let check_instant = std::time::Instant::now();
597 cfg_if! {
598 if #[cfg(feature = "test_network")] {
599 let result = if self.ledger.dev_committee_for_round(block.round())?.is_some() {
601 Ok(block)
602 } else {
603 ledger_update.check_next_block(block)
604 };
605 } else {
606 let result = ledger_update.check_next_block(block);
607 }
608 }
609
610 let block = match result {
611 Ok(block) => block,
612 Err(CheckBlockError::BlockAlreadyExists { .. }) => {
613 debug!("The given block hash already exists in the ledger");
614 return Ok(false);
615 }
616 Err(CheckBlockError::InvalidHeight { .. }) => {
617 debug!("The ledger advanced while we were constructing the next block");
618 return Ok(false);
619 }
620 Err(CheckBlockError::InvalidRound { new, previous }) => {
621 debug!("The subDAG round is too low. Expected >{previous}, got {new}");
622 return Ok(false);
623 }
624 Err(err) => return Err(err.into_anyhow()),
625 };
626
627 let check_elapsed = check_instant.elapsed();
628 trace!("check_next_block took {:.3}s", check_elapsed.as_secs_f64());
629 #[cfg(feature = "metrics")]
630 metrics::histogram(metrics::consensus::CHECK_NEXT_BLOCK_SECS, check_elapsed.as_secs_f64());
631
632 let block_height = block.height();
633
634 let advance_instant = std::time::Instant::now();
636 ledger_update.advance_to_next_block(&block)?;
637 let advance_elapsed = advance_instant.elapsed();
638 trace!("advance_to_next_block took {:.3}s", advance_elapsed.as_secs_f64());
639 #[cfg(feature = "metrics")]
640 metrics::histogram(metrics::consensus::ADVANCE_TO_NEXT_BLOCK_SECS, advance_elapsed.as_secs_f64());
641
642 #[cfg(feature = "telemetry")]
643 let latest_committee = self.ledger.get_committee_lookback_for_round(self.ledger.latest_round());
647
648 if block_height.is_multiple_of(N::NUM_BLOCKS_PER_EPOCH) {
650 self.solutions_queue.lock().clear();
652 self.bft.primary().clear_worker_solutions();
654 }
655
656 match self.block_sync.get_block_locators() {
658 Ok(locators) => self.ping.update_block_locators(locators),
659 Err(err) => error!(
660 "{}",
661 flatten_error(err.context("Failed to generate new block locators after block advancement"))
662 ),
663 }
664
665 self.block_sync.set_sync_height(block_height);
667
668 #[cfg(feature = "metrics")]
672 {
673 let now_utc = snarkos_node_bft::helpers::now_utc();
674 let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
675 let next_block_timestamp = block.header().metadata().timestamp();
676 let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
677 let block_latency = next_block_timestamp - current_block_timestamp;
678 let block_lag = (now_utc - next_block_utc).whole_milliseconds();
679
680 let proof_target = block.header().proof_target();
681 let coinbase_target = block.header().coinbase_target();
682 let cumulative_proof_target = block.header().cumulative_proof_target();
683
684 metrics::add_transmission_latency_metric(&self.transmissions_tracker, &block);
686
687 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
688 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
689 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
690 metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
691 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
692 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
693 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
694
695 #[cfg(feature = "telemetry")]
697 {
698 match latest_committee {
699 Ok(latest_committee) => {
700 let participation_scores = self
702 .bft()
703 .primary()
704 .gateway()
705 .validator_telemetry()
706 .get_participation_scores(&latest_committee);
707
708 for (address, (certificate_score, signature_score)) in participation_scores {
710 let address_str = address.to_string();
711 metrics::gauge_label(
712 metrics::consensus::VALIDATOR_CERTIFICATE_PARTICIPATION,
713 "validator_address",
714 address_str.clone(),
715 certificate_score,
716 );
717 metrics::gauge_label(
718 metrics::consensus::VALIDATOR_SIGNATURE_PARTICIPATION,
719 "validator_address",
720 address_str,
721 signature_score,
722 );
723 }
724 }
725 Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))),
726 }
727 }
728 }
729
730 Ok(true)
731 }
732
733 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
735 for (transmission_id, transmission) in transmissions.into_iter() {
737 match self.reinsert_transmission(transmission_id, transmission).await {
739 Ok(true) => {}
740 Ok(false) => debug!(
741 "Unable to reinsert transmission {}:{} into the memory pool. Already exists.",
742 fmt_id(transmission_id),
743 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
744 ),
745 Err(err) => {
746 let err = err.context(format!(
747 "Unable to reinsert transmission {}.{} into the memory pool",
748 fmt_id(transmission_id),
749 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
750 ));
751 warn!("{}", flatten_error(err));
752 }
753 }
754 }
755 }
756
757 async fn reinsert_transmission(
764 &self,
765 transmission_id: TransmissionID<N>,
766 transmission: Transmission<N>,
767 ) -> Result<bool> {
768 let (callback, callback_receiver) = oneshot::channel();
770 match (transmission_id, transmission) {
772 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
773 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
774 self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
776 }
777 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
778 self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
780 }
781 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
782 }
783 callback_receiver.await?
785 }
786
787 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
789 self.handles.lock().push(tokio::spawn(future));
790 }
791
792 pub async fn shut_down(&self) {
794 info!("Shutting down consensus...");
795 self.bft.shut_down().await;
797 self.handles.lock().iter().for_each(|handle| handle.abort());
799 }
800}