use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
use color_eyre::Report;
use futures::{stream::FuturesUnordered, StreamExt};
use thread_priority::{ThreadBuilder, ThreadPriority};
use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
use tower::Service;
use tracing::{Instrument, Span};
use zebra_chain::{
block::{self, Block},
chain_sync_status::ChainSyncStatus,
chain_tip::ChainTip,
diagnostic::task::WaitForPanics,
serialization::{AtLeastOne, ZcashSerialize},
shutdown::is_shutting_down,
work::equihash::{Solution, SolverCancelled},
};
use zebra_network::AddressBookPeers;
use zebra_node_services::mempool;
use zebra_rpc::{
client::{
BlockTemplateTimeSource,
GetBlockTemplateCapability::{CoinbaseTxn, LongPoll},
GetBlockTemplateParameters,
GetBlockTemplateRequestMode::Template,
HexData,
},
methods::{RpcImpl, RpcServer},
proposal_block_from_template,
};
use zebra_state::WatchReceiver;
use crate::components::metrics::Config;
pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
pub fn spawn_init<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>(
config: &Config,
rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
) -> JoinHandle<Result<(), Report>>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::Request,
Response = zebra_state::Response,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<State as Service<zebra_state::Request>>::Future: Send,
ReadState: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
tokio::spawn(init(config.clone(), rpc).in_current_span())
}
pub async fn init<Mempool, State, ReadState, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
_config: Config,
rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
) -> Result<(), Report>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::Request,
Response = zebra_state::Response,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<State as Service<zebra_state::Request>>::Future: Send,
ReadState: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
let configured_threads = 1;
let available_threads = available_parallelism()
.map(usize::from)
.unwrap_or(configured_threads);
let solver_count = min(configured_threads, available_threads);
info!(
?solver_count,
"launching mining tasks with parallel solvers"
);
let (template_sender, template_receiver) = watch::channel(None);
let template_receiver = WatchReceiver::new(template_receiver);
let mut abort_handles = Vec::new();
let template_generator = tokio::task::spawn(
generate_block_templates(rpc.clone(), template_sender).in_current_span(),
);
abort_handles.push(template_generator.abort_handle());
let template_generator = template_generator.wait_for_panics();
let mut mining_solvers = FuturesUnordered::new();
for solver_id in 0..solver_count {
let solver_id = min(solver_id, usize::from(u8::MAX))
.try_into()
.expect("just limited to u8::MAX");
let solver = tokio::task::spawn(
run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
);
abort_handles.push(solver.abort_handle());
mining_solvers.push(solver.wait_for_panics());
}
let first_result;
select! {
result = template_generator => { first_result = result; }
result = mining_solvers.next() => {
first_result = result
.expect("stream never terminates because there is at least one solver task");
}
}
for abort_handle in abort_handles {
abort_handle.abort();
}
first_result
}
#[instrument(skip(rpc, template_sender))]
pub async fn generate_block_templates<
Mempool,
State,
ReadState,
Tip,
BlockVerifierRouter,
SyncStatus,
AddressBook,
>(
rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
template_sender: watch::Sender<Option<Arc<Block>>>,
) -> Result<(), Report>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::Request,
Response = zebra_state::Response,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<State as Service<zebra_state::Request>>::Future: Send,
ReadState: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
let mut parameters =
GetBlockTemplateParameters::new(Template, None, vec![LongPoll, CoinbaseTxn], None, None);
while !template_sender.is_closed() && !is_shutting_down() {
let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
let Ok(template) = template else {
warn!(
?BLOCK_TEMPLATE_WAIT_TIME,
?template,
"waiting for a valid block template",
);
if !is_shutting_down() {
sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
}
continue;
};
let template = template
.try_into_template()
.expect("invalid RPC response: proposal in response to a template request");
info!(
height = ?template.height(),
transactions = ?template.transactions().len(),
"mining with an updated block template",
);
parameters = GetBlockTemplateParameters::new(
Template,
None,
vec![LongPoll, CoinbaseTxn],
Some(template.long_poll_id()),
None,
);
let block = proposal_block_from_template(
&template,
BlockTemplateTimeSource::CurTime,
rpc.network(),
)?;
template_sender.send_if_modified(|old_block| {
if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
return false;
}
*old_block = Some(Arc::new(block));
true
});
if !template_sender.is_closed() && !is_shutting_down() {
sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
}
}
Ok(())
}
#[instrument(skip(template_receiver, rpc))]
pub async fn run_mining_solver<
Mempool,
State,
ReadState,
Tip,
BlockVerifierRouter,
SyncStatus,
AddressBook,
>(
solver_id: u8,
mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
) -> Result<(), Report>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::Request,
Response = zebra_state::Response,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<State as Service<zebra_state::Request>>::Future: Send,
ReadState: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
<BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
while template_receiver.has_changed().is_ok() && !is_shutting_down() {
template_receiver.mark_as_seen();
let template = template_receiver.cloned_watch_data();
let Some(template) = template else {
if solver_id == 0 {
info!(
?solver_id,
?BLOCK_TEMPLATE_WAIT_TIME,
"solver waiting for initial block template"
);
} else {
debug!(
?solver_id,
?BLOCK_TEMPLATE_WAIT_TIME,
"solver waiting for initial block template"
);
}
if !is_shutting_down() {
sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
}
continue;
};
let height = template.coinbase_height().expect("template is valid");
let mut cancel_receiver = template_receiver.clone();
let old_header = *template.header;
let cancel_fn = move || match cancel_receiver.has_changed() {
Ok(has_changed) => {
cancel_receiver.mark_as_seen();
if has_changed
&& Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
{
Err(SolverCancelled)
} else {
Ok(())
}
}
Err(_sender_dropped) => Err(SolverCancelled),
};
let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
if solver_id == 0 {
info!(
?height,
?solver_id,
new_template = ?template_receiver.has_changed(),
shutting_down = ?is_shutting_down(),
"solver cancelled: getting a new block template or shutting down"
);
} else {
debug!(
?height,
?solver_id,
new_template = ?template_receiver.has_changed(),
shutting_down = ?is_shutting_down(),
"solver cancelled: getting a new block template or shutting down"
);
}
if template_receiver.has_changed().is_ok() && !is_shutting_down() {
sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
}
continue;
};
let mut any_success = false;
for block in blocks {
let data = block
.zcash_serialize_to_vec()
.expect("serializing to Vec never fails");
match rpc.submit_block(HexData(data), None).await {
Ok(success) => {
info!(
?height,
hash = ?block.hash(),
?solver_id,
?success,
"successfully mined a new block",
);
any_success = true;
}
Err(error) => info!(
?height,
hash = ?block.hash(),
?solver_id,
?error,
"validating a newly mined block failed, trying again",
),
}
}
if !any_success {
if template_receiver.has_changed().is_ok() && !is_shutting_down() {
sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
}
continue;
}
tokio::select! {
shutdown_result = template_receiver.changed() => shutdown_result?,
_ = sleep(BLOCK_MINING_WAIT_TIME) => {}
}
}
Ok(())
}
pub async fn mine_a_block<F>(
solver_id: u8,
template: Arc<Block>,
cancel_fn: F,
) -> Result<AtLeastOne<Block>, SolverCancelled>
where
F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
{
let mut header = *template.header;
*header.nonce.first_mut().unwrap() = solver_id;
*header.nonce.last_mut().unwrap() = solver_id;
let span = Span::current();
let solved_headers =
tokio::task::spawn_blocking(move || span.in_scope(move || {
let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
if let Err(error) = priority_result {
info!(?error, "could not set miner to run at a low priority: running at default priority");
}
Solution::solve(header, cancel_fn)
}).expect("unable to spawn miner thread");
miner_thread_handle.wait_for_panics()
}))
.wait_for_panics()
.await?;
let block = (*template).clone();
let solved_blocks: Vec<Block> = solved_headers
.into_iter()
.map(|header| {
let mut block = block.clone();
block.header = Arc::new(header);
block
})
.collect();
Ok(solved_blocks
.try_into()
.expect("a 1:1 mapping of AtLeastOne produces at least one block"))
}