use {
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
crossbeam_channel::unbounded,
itertools::Itertools,
log::*,
solana_core::{
banking_stage::{unified_scheduler::ensure_banking_stage_setup, BankingStage},
banking_trace::BankingTracer,
consensus::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
progress_map::{ForkProgress, ProgressMap},
},
drop_bank_service::DropBankService,
repair::cluster_slot_state_verifier::{
DuplicateConfirmedSlots, DuplicateSlotsTracker, EpochSlotsFrozenSlots,
},
replay_stage::ReplayStage,
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
},
solana_entry::entry::Entry,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_hash::Hash,
solana_keypair::Keypair,
solana_ledger::{
blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::to_packet_batches,
solana_poh::poh_recorder::create_test_recorder,
solana_pubkey::Pubkey,
solana_runtime::{
bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo,
installed_scheduler_pool::SchedulingContext,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_signer::Signer,
solana_streamer::socket::SocketAddrSpace,
solana_system_transaction as system_transaction,
solana_timings::ExecuteTimings,
solana_transaction_error::TransactionResult as Result,
solana_unified_scheduler_logic::{SchedulingMode, Task},
solana_unified_scheduler_pool::{
DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
TaskHandler,
},
std::{
collections::HashMap,
sync::{atomic::Ordering, Arc, Mutex},
thread::sleep,
time::Duration,
},
};
#[test]
fn test_scheduler_waited_by_drop_bank_service() {
solana_logger::setup();
static LOCK_TO_STALL: Mutex<()> = Mutex::new(());
#[derive(Debug)]
struct StallingHandler;
impl TaskHandler for StallingHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
scheduling_context: &SchedulingContext,
task: &Task,
handler_context: &HandlerContext,
) {
info!("Stalling at StallingHandler::handle()...");
*LOCK_TO_STALL.lock().unwrap();
std::thread::sleep(std::time::Duration::from_secs(3));
info!("Now entering into DefaultTaskHandler::handle()...");
DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
}
}
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let pool = pool_raw.clone();
bank_forks.write().unwrap().install_scheduler_pool(pool);
let genesis = 0;
let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
let pruned = 2;
let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);
let root = 3;
let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
root_bank.freeze();
let root_hash = root_bank.hash();
bank_forks.write().unwrap().insert(root_bank);
let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
2,
genesis_config.hash(),
));
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank
.schedule_transaction_executions([(tx, 0)].into_iter())
.unwrap();
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);
let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
let drop_bank_service = DropBankService::new(drop_bank_receiver2);
info!("calling handle_new_root()...");
{
let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));
let mut progress = ProgressMap::default();
for i in genesis..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}
let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, Hash::default()))
.collect();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes {
votes_per_slot: vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
None, None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender1,
)
.unwrap();
}
let pruned_banks = drop_bank_receiver1.recv().unwrap();
assert_eq!(
pruned_banks
.iter()
.map(|b| b.slot())
.sorted()
.collect::<Vec<_>>(),
vec![genesis, pruned]
);
info!("sending pruned banks to DropBankService...");
drop_bank_sender2.send(pruned_banks).unwrap();
info!("joining the drop bank service...");
drop((
(drop_bank_sender1, drop_bank_receiver1),
(drop_bank_sender2,),
));
drop_bank_service.join().unwrap();
info!("finally joined the drop bank service!");
assert_eq!(pool_raw.pooled_scheduler_count(), 1);
}
#[test]
fn test_scheduler_producing_blocks() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
let (exit, poh_recorder, transaction_recorder, poh_service, signal_receiver) =
create_test_recorder(
genesis_bank.clone(),
blockstore.clone(),
None,
Some(leader_schedule_cache),
);
let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let channels = {
let banking_tracer = BankingTracer::new_disabled();
banking_tracer.create_channels(true)
};
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
Arc::new(ClusterInfo::new(
node.info,
keypair,
SocketAddrSpace::Unspecified,
))
};
ensure_banking_stage_setup(
&pool,
&bank_forks,
&channels,
&cluster_info,
&poh_recorder,
transaction_recorder,
BankingStage::num_threads(),
);
bank_forks.write().unwrap().install_scheduler_pool(pool);
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}
let tx = system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
1,
genesis_config.hash(),
);
let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
let tx = RuntimeTransaction::from_transaction_for_tests(tx);
let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
let tpu_bank = bank_forks
.write()
.unwrap()
.insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank.clone_with_scheduler(), false);
tpu_bank.unpause_new_block_production_scheduler();
let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
assert_eq!(tpu_bank.transaction_count(), 0);
channels
.unified_sender()
.send(banking_packet_batch)
.unwrap();
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}
assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
assert_eq!(tpu_bank.transaction_count(), 1);
assert_matches!(
signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}