use {
crate::{
banking_trace::BankingTracer,
replay_stage::{Finalizer, ReplayStage},
},
agave_votor::{common::block_timeout, event::LeaderWindowInfo},
crossbeam_channel::Receiver,
solana_clock::Slot,
solana_gossip::cluster_info::ClusterInfo,
solana_hash::Hash,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_measure::measure::Measure,
solana_poh::{
poh_recorder::{GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS, PohRecorder, PohRecorderError},
record_channels::RecordReceiver,
},
solana_pubkey::Pubkey,
solana_rpc::{rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier},
solana_runtime::{
bank::{Bank, NewBankOptions},
bank_forks::BankForks,
leader_schedule_utils::{last_of_consecutive_leader_slots, leader_slot_index},
},
stats::{LoopMetrics, SlotMetrics},
std::{
sync::{
Arc, Condvar, Mutex, RwLock,
atomic::{AtomicBool, Ordering},
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
thiserror::Error,
};
mod stats;
pub struct BlockCreationLoop {
thread: JoinHandle<()>,
}
impl BlockCreationLoop {
pub fn new(config: BlockCreationLoopConfig) -> Self {
let thread = Builder::new()
.name("solBlkCreatLoop".to_string())
.spawn(move || {
info!("BlockCreationLoop has started");
start_loop(config);
info!("BlockCreationLoop has stopped");
})
.unwrap();
Self { thread }
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}
pub struct BlockCreationLoopConfig {
pub exit: Arc<AtomicBool>,
pub bank_forks: Arc<RwLock<BankForks>>,
pub blockstore: Arc<Blockstore>,
pub cluster_info: Arc<ClusterInfo>,
pub poh_recorder: Arc<RwLock<PohRecorder>>,
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
pub banking_tracer: Arc<BankingTracer>,
pub slot_status_notifier: Option<SlotStatusNotifier>,
pub leader_window_info_receiver: Receiver<LeaderWindowInfo>,
pub highest_parent_ready: Arc<RwLock<(Slot, (Slot, Hash))>>,
pub replay_highest_frozen: Arc<ReplayHighestFrozen>,
pub record_receiver_receiver: Receiver<RecordReceiver>,
}
struct LeaderContext {
exit: Arc<AtomicBool>,
my_pubkey: Pubkey,
leader_window_info_receiver: Receiver<LeaderWindowInfo>,
highest_parent_ready: Arc<RwLock<(Slot, (Slot, Hash))>>,
blockstore: Arc<Blockstore>,
record_receiver: RecordReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
slot_status_notifier: Option<SlotStatusNotifier>,
banking_tracer: Arc<BankingTracer>,
replay_highest_frozen: Arc<ReplayHighestFrozen>,
metrics: LoopMetrics,
slot_metrics: SlotMetrics,
}
#[derive(Default)]
pub struct ReplayHighestFrozen {
pub highest_frozen_slot: Mutex<Slot>,
pub freeze_notification: Condvar,
}
#[derive(Debug, Error)]
enum StartLeaderError {
#[error("Replay is behind for parent slot {0} for leader slot {1}")]
ReplayIsBehind( Slot, Slot),
#[error("Already contain bank for leader slot {0}")]
AlreadyHaveBank( Slot),
#[error("Cluster has certified blocks before {0} which is after our leader slot {1}")]
ClusterCertifiedBlocksAfterWindow(
Slot,
Slot,
),
}
fn start_loop(config: BlockCreationLoopConfig) {
let BlockCreationLoopConfig {
exit,
bank_forks,
blockstore,
cluster_info,
poh_recorder,
leader_schedule_cache,
rpc_subscriptions,
banking_tracer,
slot_status_notifier,
leader_window_info_receiver,
highest_parent_ready,
replay_highest_frozen,
record_receiver_receiver,
} = config;
let _exit = Finalizer::new(exit.clone());
let mut my_pubkey = cluster_info.id();
info!("{my_pubkey}: Block creation loop initialized");
let record_receiver = match record_receiver_receiver.recv() {
Ok(receiver) => receiver,
Err(e) => {
error!("{my_pubkey}: Failed to receive RecordReceiver from PohService. Exiting: {e:?}",);
return;
}
};
info!("{my_pubkey}: PohService has shutdown, BlockCreationLoop is enabled");
let mut ctx = LeaderContext {
exit,
my_pubkey,
leader_window_info_receiver,
highest_parent_ready,
blockstore,
record_receiver,
poh_recorder,
leader_schedule_cache,
bank_forks,
rpc_subscriptions,
slot_status_notifier,
banking_tracer,
replay_highest_frozen,
metrics: LoopMetrics::default(),
slot_metrics: SlotMetrics::default(),
};
{
let mut w_poh_recorder = ctx.poh_recorder.write().unwrap();
w_poh_recorder.enable_alpenglow();
}
reset_poh_recorder(&ctx.bank_forks.read().unwrap().working_bank(), &ctx);
while !ctx.exit.load(Ordering::Relaxed) {
if my_pubkey != cluster_info.id() {
let my_old_pubkey = my_pubkey;
my_pubkey = cluster_info.id();
ctx.my_pubkey = my_pubkey;
warn!(
"Identity changed from {my_old_pubkey} to {my_pubkey} during block creation loop"
);
}
let LeaderWindowInfo {
start_slot,
end_slot,
parent_block: (parent_slot, _),
skip_timer,
} = {
let Some(info) = ctx
.leader_window_info_receiver
.recv_timeout(Duration::from_secs(1))
.ok()
.and_then(|window| {
ctx.leader_window_info_receiver
.try_iter()
.last()
.or(Some(window))
})
else {
continue;
};
info
};
trace!("Received window notification for {start_slot} to {end_slot} parent: {parent_slot}");
if let Err(e) = produce_window(start_slot, end_slot, parent_slot, skip_timer, &mut ctx) {
error!(
"{my_pubkey}: Unable to produce window {start_slot}-{end_slot}, skipping window: \
{e:?}"
);
}
ctx.metrics.loop_count += 1;
ctx.metrics.report(Duration::from_secs(1));
}
info!("{my_pubkey}: Block creation loop shutting down");
}
fn reset_poh_recorder(bank: &Arc<Bank>, ctx: &LeaderContext) {
trace!("{}: resetting poh to {}", ctx.my_pubkey, bank.slot());
assert!(ctx.record_receiver.is_shutdown() && ctx.record_receiver.is_safe_to_restart());
let next_leader_slot = ctx.leader_schedule_cache.next_leader_slot(
&ctx.my_pubkey,
bank.slot(),
bank,
Some(ctx.blockstore.as_ref()),
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
);
ctx.poh_recorder
.write()
.unwrap()
.reset(bank.clone(), next_leader_slot);
}
fn produce_window(
start_slot: Slot,
end_slot: Slot,
mut parent_slot: Slot,
skip_timer: Instant,
ctx: &mut LeaderContext,
) -> Result<(), StartLeaderError> {
let my_pubkey = ctx.my_pubkey;
let mut window_production_start = Measure::start("window_production");
let mut slot = start_slot;
while !ctx.exit.load(Ordering::Relaxed) && slot <= end_slot {
start_leader_wait_for_parent_replay(slot, parent_slot, skip_timer, ctx)?;
let leader_index = leader_slot_index(slot);
let timeout = block_timeout(leader_index);
trace!(
"{my_pubkey}: waiting for leader bank {slot} to finish, remaining time: {}",
timeout.saturating_sub(skip_timer.elapsed()).as_millis(),
);
let mut bank_completion_measure = Measure::start("bank_completion");
if let Err(e) = record_and_complete_block(
ctx.poh_recorder.as_ref(),
&mut ctx.record_receiver,
skip_timer,
timeout,
) {
panic!("PohRecorder record failed: {e:?}");
}
assert!(!ctx.poh_recorder.read().unwrap().has_bank());
bank_completion_measure.stop();
ctx.slot_metrics.report();
ctx.metrics.bank_timeout_completion_count += 1;
let _ = ctx
.metrics
.bank_timeout_completion_elapsed_hist
.increment(bank_completion_measure.as_us())
.inspect_err(|e| {
error!(
"{}: unable to increment bank completion histogram {e:?}",
ctx.my_pubkey
);
});
parent_slot = slot;
slot += 1;
}
trace!("{my_pubkey}: finished leader window {start_slot}-{end_slot}");
window_production_start.stop();
ctx.metrics.window_production_elapsed += window_production_start.as_us();
Ok(())
}
fn record_and_complete_block(
poh_recorder: &RwLock<PohRecorder>,
record_receiver: &mut RecordReceiver,
block_timer: Instant,
block_timeout: Duration,
) -> Result<(), PohRecorderError> {
while !block_timeout
.saturating_sub(block_timer.elapsed())
.is_zero()
{
let Ok(record) = record_receiver.try_recv() else {
continue;
};
poh_recorder.write().unwrap().record(
record.bank_id,
record.mixins,
record.transaction_batches,
)?;
}
record_receiver.shutdown();
for record in record_receiver.drain() {
poh_recorder.write().unwrap().record(
record.bank_id,
record.mixins,
record.transaction_batches,
)?;
}
let mut w_poh_recorder = poh_recorder.write().unwrap();
let bank = w_poh_recorder
.bank()
.expect("Bank cannot have been cleared as BlockCreationLoop is the only modifier");
trace!(
"{}: bank {} has reached block timeout, ticking",
bank.leader_id(),
bank.slot()
);
let max_tick_height = bank.max_tick_height();
bank.set_tick_height(max_tick_height - 1);
drop(bank);
w_poh_recorder.tick_alpenglow(max_tick_height);
Ok(())
}
fn start_leader_wait_for_parent_replay(
slot: Slot,
parent_slot: Slot,
skip_timer: Instant,
ctx: &mut LeaderContext,
) -> Result<(), StartLeaderError> {
trace!(
"{}: Attempting to start leader slot {slot} parent {parent_slot}",
ctx.my_pubkey
);
let my_pubkey = ctx.my_pubkey;
let timeout = block_timeout(leader_slot_index(slot));
let end_slot = last_of_consecutive_leader_slots(slot);
let mut slot_delay_start = Measure::start("slot_delay");
while !timeout.saturating_sub(skip_timer.elapsed()).is_zero() {
ctx.slot_metrics.attempt_start_leader_count += 1;
let highest_parent_ready_slot = ctx.highest_parent_ready.read().unwrap().0;
if highest_parent_ready_slot > end_slot {
trace!(
"{my_pubkey}: Skipping production of {slot} because highest parent ready slot is \
{highest_parent_ready_slot} > end slot {end_slot}"
);
ctx.metrics.skipped_window_behind_parent_ready_count += 1;
return Err(StartLeaderError::ClusterCertifiedBlocksAfterWindow(
highest_parent_ready_slot,
slot,
));
}
match maybe_start_leader(slot, parent_slot, ctx) {
Ok(()) => {
slot_delay_start.stop();
let _ = ctx
.slot_metrics
.slot_delay_hist
.increment(slot_delay_start.as_us())
.inspect_err(|e| {
error!(
"{}: unable to increment slot delay histogram {e:?}",
ctx.my_pubkey
);
});
return Ok(());
}
Err(StartLeaderError::ReplayIsBehind(_, _)) => {
trace!(
"{my_pubkey}: Attempting to produce slot {slot}, however replay of the the \
parent {parent_slot} is not yet finished, waiting. Skip timer {}",
skip_timer.elapsed().as_millis()
);
let highest_frozen_slot = ctx
.replay_highest_frozen
.highest_frozen_slot
.lock()
.unwrap();
let mut wait_start = Measure::start("replay_is_behind");
let _unused = ctx
.replay_highest_frozen
.freeze_notification
.wait_timeout_while(
highest_frozen_slot,
timeout.saturating_sub(skip_timer.elapsed()),
|hfs| *hfs < parent_slot,
)
.unwrap();
wait_start.stop();
ctx.slot_metrics.replay_is_behind_cumulative_wait_elapsed += wait_start.as_us();
let _ = ctx
.slot_metrics
.replay_is_behind_wait_elapsed_hist
.increment(wait_start.as_us())
.inspect_err(|e| {
error!(
"{}: unable to increment replay is behind histogram {e:?}",
ctx.my_pubkey
);
});
}
Err(e) => return Err(e),
}
}
trace!(
"{my_pubkey}: Skipping production of {slot}: Unable to replay parent {parent_slot} in time"
);
Err(StartLeaderError::ReplayIsBehind(parent_slot, slot))
}
fn maybe_start_leader(
slot: Slot,
parent_slot: Slot,
ctx: &mut LeaderContext,
) -> Result<(), StartLeaderError> {
if ctx.bank_forks.read().unwrap().get(slot).is_some() {
ctx.slot_metrics.already_have_bank_count += 1;
return Err(StartLeaderError::AlreadyHaveBank(slot));
}
let Some(parent_bank) = ctx.bank_forks.read().unwrap().get(parent_slot) else {
ctx.slot_metrics.replay_is_behind_count += 1;
return Err(StartLeaderError::ReplayIsBehind(parent_slot, slot));
};
if !parent_bank.is_frozen() {
ctx.slot_metrics.replay_is_behind_count += 1;
return Err(StartLeaderError::ReplayIsBehind(parent_slot, slot));
}
create_and_insert_leader_bank(slot, parent_bank, ctx);
Ok(())
}
fn create_and_insert_leader_bank(slot: Slot, parent_bank: Arc<Bank>, ctx: &mut LeaderContext) {
let parent_slot = parent_bank.slot();
let root_slot = ctx.bank_forks.read().unwrap().root();
trace!(
"{}: Creating and inserting leader slot {slot} parent {parent_slot} root {root_slot}",
ctx.my_pubkey
);
let Some(leader) = ctx
.leader_schedule_cache
.slot_leader_at(slot, Some(&parent_bank))
else {
panic!(
"{}: No leader found for slot {slot} with parent {parent_slot}. Something has gone \
wrong with the block creation loop. exiting",
ctx.my_pubkey,
);
};
if ctx.my_pubkey != leader.id {
panic!(
"{}: Attempting to produce a block for {slot}, however the leader is {}. Something \
has gone wrong with the block creation loop. exiting",
ctx.my_pubkey, leader.id,
);
}
if let Some(bank) = ctx.poh_recorder.read().unwrap().bank() {
panic!(
"{}: Attempting to produce a block for {slot}, however we still are in production of \
{}. Something has gone wrong with the block creation loop. exiting",
ctx.my_pubkey,
bank.slot(),
);
}
if ctx.poh_recorder.read().unwrap().start_slot() != parent_slot {
reset_poh_recorder(&parent_bank, ctx);
}
let tpu_bank = ReplayStage::new_bank_from_parent_with_notify(
parent_bank.clone(),
slot,
root_slot,
leader,
ctx.rpc_subscriptions.as_deref(),
&ctx.slot_status_notifier,
NewBankOptions::default(),
);
ctx.banking_tracer.hash_event(
parent_slot,
&parent_bank.last_blockhash(),
&parent_bank.hash(),
);
let tpu_bank = ctx.bank_forks.write().unwrap().insert(tpu_bank);
let bank_id = tpu_bank.bank_id();
ctx.poh_recorder.write().unwrap().set_bank(tpu_bank);
ctx.record_receiver.restart(bank_id);
ctx.slot_metrics.reset(slot);
info!(
"{}: new fork:{} parent:{} (leader) root:{}",
ctx.my_pubkey, slot, parent_slot, root_slot
);
}