use crate::{
base_node::{
comms_interface::{Broadcast, CommsInterfaceError, LocalNodeCommsInterface},
state_machine_service::states::{StateEvent, SyncStatus},
},
blocks::{Block, BlockHeader, NewBlockTemplate},
consensus::ConsensusManager,
mempool::MempoolStateEvent,
mining::{cpu_miner::CpuPow, error::MinerError, MinerInstruction},
proof_of_work::PowAlgorithm,
transactions::{
transaction::UnblindedOutput,
types::{CryptoFactories, PrivateKey},
CoinbaseBuilder,
},
};
use core::sync::atomic::{AtomicBool, AtomicU64};
use futures::{
channel::{
mpsc,
mpsc::{Receiver as mpscReceiver, Sender as mpscSender},
},
pin_mut,
StreamExt,
};
use log::*;
use rand::rngs::OsRng;
use std::sync::{atomic::Ordering, Arc};
use tari_crypto::keys::SecretKey;
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, task, task::spawn_blocking};
pub const LOG_TARGET: &str = "c::m::miner";
pub struct Miner {
kill_signal: ShutdownSignal,
stop_mining_flag: Arc<AtomicBool>,
consensus: ConsensusManager,
node_interface: LocalNodeCommsInterface,
utxo_sender: mpscSender<UnblindedOutput>,
node_state_event_rx: Option<broadcast::Receiver<Arc<StateEvent>>>,
mempool_state_event_rx: Option<broadcast::Receiver<MempoolStateEvent>>,
miner_instruction_events: broadcast::Sender<MinerInstruction>,
threads: usize,
mining_enabled_by_user: Arc<AtomicBool>,
mining_status: Arc<AtomicBool>,
hashrate: Arc<AtomicU64>,
}
impl Miner {
pub fn new(
kill_signal: ShutdownSignal,
consensus: ConsensusManager,
node_interface: &LocalNodeCommsInterface,
threads: usize,
) -> Miner
{
let (utxo_sender, _): (mpscSender<UnblindedOutput>, mpscReceiver<UnblindedOutput>) = mpsc::channel(1);
let (miner_instruction_events, _): (
broadcast::Sender<MinerInstruction>,
broadcast::Receiver<MinerInstruction>,
) = broadcast::channel(10);
Miner {
kill_signal,
consensus,
stop_mining_flag: Arc::new(AtomicBool::new(false)),
node_interface: node_interface.clone(),
utxo_sender,
node_state_event_rx: None,
mempool_state_event_rx: None,
miner_instruction_events,
threads,
mining_enabled_by_user: Arc::new(AtomicBool::new(false)),
mining_status: Arc::new(AtomicBool::new(false)),
hashrate: Arc::new(AtomicU64::new(0)),
}
}
pub fn get_utxo_receiver_channel(&mut self) -> mpscReceiver<UnblindedOutput> {
let (sender, receiver): (mpscSender<UnblindedOutput>, mpscReceiver<UnblindedOutput>) = mpsc::channel(20);
self.utxo_sender = sender;
receiver
}
pub fn get_miner_instruction_events_sender_channel(&self) -> broadcast::Sender<MinerInstruction> {
self.miner_instruction_events.clone()
}
pub fn subscribe_to_node_state_events(&mut self, state_change_event_rx: broadcast::Receiver<Arc<StateEvent>>) {
self.node_state_event_rx = Some(state_change_event_rx);
}
pub fn subscribe_to_mempool_state_events(&mut self, state_event_rx: broadcast::Receiver<MempoolStateEvent>) {
self.mempool_state_event_rx = Some(state_event_rx);
}
pub fn enable_mining_flag(&self) -> Arc<AtomicBool> {
self.mining_enabled_by_user.clone()
}
pub fn mining_status_flag(&self) -> Arc<AtomicBool> {
self.mining_status.clone()
}
pub fn get_hashrate_u64(&self) -> Arc<AtomicU64> {
self.hashrate.clone()
}
async fn mining(mut self) -> Result<Miner, MinerError> {
info!(target: LOG_TARGET, "Miner asking for new candidate block to mine.");
let block_template = self.get_block_template().await;
if block_template.is_err() {
error!(
target: LOG_TARGET,
"Could not get block template from basenode {:?}.", block_template
);
return Ok(self);
};
let mut block_template = block_template.unwrap();
let target_difficulty = block_template.target_difficulty;
let output = self.add_coinbase(&mut block_template);
if output.is_err() {
error!(
target: LOG_TARGET,
"Could not add coinbase to block template {:?}.", output
);
return Ok(self);
};
let output = output.unwrap();
let block = self.get_block(block_template).await;
if block.is_err() {
error!(target: LOG_TARGET, "Could not get block from basenode {:?}.", block);
return Ok(self);
};
self.mining_status.store(true, Ordering::Relaxed);
let mut block = block.unwrap();
debug!(target: LOG_TARGET, "Miner got new block to mine.");
let (tx, mut rx): (mpscSender<Option<BlockHeader>>, mpscReceiver<Option<BlockHeader>>) =
mpsc::channel(self.threads);
for _ in 0..self.threads {
let stop_mining_flag = self.stop_mining_flag.clone();
let header = block.header.clone();
let thread_hash_rate = self.hashrate.clone();
let mut tx_channel = tx.clone();
trace!("spawning mining thread");
spawn_blocking(move || {
let result = CpuPow::mine(header, target_difficulty, stop_mining_flag, thread_hash_rate);
if let Err(e) = tx_channel.try_send(result) {
warn!(target: LOG_TARGET, "Could not return mining result: {}", e);
}
});
}
drop(tx); while let Some(value) = rx.next().await {
if let Some(r) = value {
self.stop_mining_flag.store(true, Ordering::Relaxed);
block.header = r;
let block_sent = self.send_block(block).await;
if let Err(e) = block_sent {
error!(target: LOG_TARGET, "Could not send block to base node. {:?}.", e);
break;
}
let utxo_sent = self.utxo_sender.try_send(output);
if let Err(e) = utxo_sent {
error!(target: LOG_TARGET, "Could not send utxo to wallet. {:?}.", e);
return Err(MinerError::CommunicationError(e.to_string()));
}
break;
}
}
trace!("returning closing thread");
Ok(self)
}
async fn not_mining(self) -> Result<Miner, MinerError> {
self.mining_status.store(false, Ordering::Relaxed);
Ok(self)
}
pub async fn mine(mut self) {
let stop_mining_flag = self.stop_mining_flag.clone();
let mining_enabled_by_user = self.mining_enabled_by_user.clone();
let mempool_state_event = self
.mempool_state_event_rx
.take()
.expect("Miner does not have access to the mempool state event stream")
.fuse();
let blockchain_event = self
.node_state_event_rx
.take()
.expect("Miner does not have access to state event stream")
.fuse();
let mining_instruction_event = self.miner_instruction_events.subscribe().fuse();
let mut kill_signal = self.kill_signal.clone();
pin_mut!(mempool_state_event);
pin_mut!(blockchain_event);
pin_mut!(mining_instruction_event);
let mut spawn_mining_task = true;
trace!("starting mining thread");
'main: loop {
stop_mining_flag.store(false, Ordering::Relaxed); if !mining_enabled_by_user.load(Ordering::Relaxed) {
spawn_mining_task = false;
}
#[allow(clippy::match_bool)]
let mining_future = match spawn_mining_task {
true => task::spawn(self.mining()),
false => task::spawn(self.not_mining()),
};
let mut wait_for_mining_event = true;
while wait_for_mining_event {
futures::select! {
mempool_event = mempool_state_event.select_next_some() => {
if let Ok(mempool_event) = mempool_event {
match mempool_event {
MempoolStateEvent::Updated => {
stop_mining_flag.store(true, Ordering::Relaxed);
spawn_mining_task = true;
wait_for_mining_event = false;
},
}
}
},
blockchain_event = blockchain_event.select_next_some() => {
if let Ok(blockchain_event) = blockchain_event {
use StateEvent::*;
match &*blockchain_event {
BlocksSynchronized | NetworkSilence => {
info!(target: LOG_TARGET,
"Our chain has synchronised with the network, or is a seed node. Starting miner");
stop_mining_flag.store(true, Ordering::Relaxed);
spawn_mining_task = true;
wait_for_mining_event = false;
},
FallenBehind(SyncStatus::Lagging(_, _)) => {
info!(target: LOG_TARGET, "Our chain has fallen behind the network. Pausing miner");
stop_mining_flag.store(true, Ordering::Relaxed);
spawn_mining_task = false;
wait_for_mining_event = false;
},
_ => {
stop_mining_flag.store(true, Ordering::Relaxed);
wait_for_mining_event = false;
},
}
}
},
instruction = mining_instruction_event.select_next_some() => {
use MinerInstruction::*;
match instruction.unwrap_or_else(|_| (IgnoreInstruction)) {
PauseMining => {
info!(target: LOG_TARGET, "Mining pause event received from the CLI");
stop_mining_flag.store(true, Ordering::Relaxed);
mining_enabled_by_user.store(false, Ordering::Relaxed);
spawn_mining_task = false;
wait_for_mining_event = false;
},
StartMining => {
info!(target: LOG_TARGET, "Mining start event received from the CLI");
mining_enabled_by_user.store(true, Ordering::Relaxed);
spawn_mining_task = true;
wait_for_mining_event = false;
},
_ => (),
}
},
_ = kill_signal => {
info!(target: LOG_TARGET, "Mining kill signal received! Miner is shutting down");
stop_mining_flag.store(true, Ordering::Relaxed);
break 'main;
}
};
}
self = mining_future.await.expect("Miner crashed").expect("Miner crashed");
}
debug!(target: LOG_TARGET, "Mining thread stopped.");
}
pub async fn get_block_template(&mut self) -> Result<NewBlockTemplate, MinerError> {
trace!(target: LOG_TARGET, "Requesting new block template from node.");
Ok(self
.node_interface
.get_new_block_template(PowAlgorithm::Sha3)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Could not get a new block template from the base node. {:?}.", e
);
e
})
.map_err(|e| MinerError::CommunicationError(e.to_string()))?)
}
pub async fn get_block(&mut self, block: NewBlockTemplate) -> Result<Block, MinerError> {
trace!(
target: LOG_TARGET,
"Asking node to fill in MMR roots for new block candidate"
);
Ok(self
.node_interface
.get_new_block(block)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
"Could not get a new block from the base node. {:?}.", e
);
e
})
.map_err(|e| MinerError::CommunicationError(e.to_string()))?)
}
fn add_coinbase(&self, block: &mut NewBlockTemplate) -> Result<UnblindedOutput, MinerError> {
let fees = block.body.get_total_fee();
let (key, r) = self.get_spending_key()?;
let factories = CryptoFactories::default();
let builder = CoinbaseBuilder::new(factories);
let builder = builder
.with_block_height(block.header.height)
.with_fees(fees)
.with_nonce(r)
.with_spend_key(key);
let (tx, unblinded_output) = builder
.build(
self.consensus.consensus_constants(block.header.height),
self.consensus.emission_schedule(),
)
.expect("invalid constructed coinbase");
block.body.add_output(tx.body.outputs()[0].clone());
block.body.add_kernel(tx.body.kernels()[0].clone());
Ok(unblinded_output)
}
pub fn get_spending_key(&self) -> Result<(PrivateKey, PrivateKey), MinerError> {
let r = PrivateKey::random(&mut OsRng);
let key = PrivateKey::random(&mut OsRng);
Ok((key, r))
}
async fn send_block(&mut self, block: Block) -> Result<(), MinerError> {
info!(target: LOG_TARGET, "Mined a block: {}", block);
match self.node_interface.submit_block(block, Broadcast::from(true)).await {
Ok(_) => {
trace!(target: LOG_TARGET, "Miner successfully submitted block");
Ok(())
},
Err(CommsInterfaceError::ChainStorageError(e)) => {
error!(target: LOG_TARGET, "Miner submitted invalid block. {:?}.", e);
Ok(())
},
Err(e) => {
error!(target: LOG_TARGET, "Could not send block to base node. {:?}.", e);
Err(MinerError::CommunicationError(e.to_string()))
},
}
}
}