use {
crate::bank::Bank,
assert_matches::assert_matches,
log::*,
solana_clock::Slot,
solana_hash::Hash,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_svm_timings::ExecuteTimings,
solana_transaction::sanitized::SanitizedTransaction,
solana_transaction_error::{TransactionError, TransactionResult as Result},
solana_unified_scheduler_logic::{OrderedTaskId, SchedulingMode},
std::{
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
thread,
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};
pub fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}
pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> Option<InstalledSchedulerBox> {
self.take_resumed_scheduler(context, initialized_result_with_timings())
}
fn take_resumed_scheduler(
&self,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Option<InstalledSchedulerBox>;
fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
fn uninstalled_from_bank_forks(self: Arc<Self>);
#[must_use]
fn toggle_block_production_mode(&self, enable: bool) -> bool;
}
#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
pub struct TimeoutListener {
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
}
impl TimeoutListener {
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
Self {
callback: Box::new(f),
}
}
pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
(self.callback)(pool);
}
}
impl Debug for TimeoutListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimeoutListener({self:p})")
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
#[cfg_attr(feature = "dev-context-only-utils", automock)]
#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
pub trait InstalledScheduler: Send + Sync + Debug + 'static {
fn id(&self) -> SchedulerId;
fn context(&self) -> &SchedulingContext;
fn schedule_execution(
&self,
transaction: RuntimeTransaction<SanitizedTransaction>,
task_id: OrderedTaskId,
) -> ScheduleResult;
fn recover_error_after_abort(&mut self) -> TransactionError;
fn wait_for_termination(
self: Box<Self>,
is_dropped: bool,
) -> (ResultWithTimings, UninstalledSchedulerBox);
fn pause_for_recent_blockhash(&mut self);
fn unpause_after_taken(&self);
}
#[cfg_attr(feature = "dev-context-only-utils", automock)]
pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
fn return_to_pool(self: Box<Self>);
}
pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
pub type SchedulerId = u64;
#[derive(Clone, Debug)]
pub struct SchedulingContext {
mode: SchedulingMode,
bank: Option<Arc<Bank>>,
}
impl SchedulingContext {
pub fn for_preallocation() -> Self {
Self {
mode: SchedulingMode::BlockProduction,
bank: None,
}
}
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
Self {
mode,
bank: Some(bank),
}
}
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
fn for_verification(bank: Arc<Bank>) -> Self {
Self::new_with_mode(SchedulingMode::BlockVerification, bank)
}
#[cfg(feature = "dev-context-only-utils")]
pub fn for_production(bank: Arc<Bank>) -> Self {
Self::new_with_mode(SchedulingMode::BlockProduction, bank)
}
pub fn is_preallocated(&self) -> bool {
self.bank.is_none()
}
pub fn mode(&self) -> SchedulingMode {
self.mode
}
pub fn bank(&self) -> Option<&Arc<Bank>> {
self.bank.as_ref()
}
pub fn slot(&self) -> Option<Slot> {
self.bank.as_ref().map(|bank| bank.slot())
}
}
pub type ResultWithTimings = (Result<()>, ExecuteTimings);
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum WaitReason {
TerminatedToFreeze,
DroppedFromBankForks,
PausedForRecentBlockhash,
}
impl WaitReason {
pub fn is_paused(&self) -> bool {
match self {
WaitReason::PausedForRecentBlockhash => true,
WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
}
}
pub fn is_dropped(&self) -> bool {
match self {
WaitReason::DroppedFromBankForks => true,
WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
}
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
Unavailable,
Active(InstalledSchedulerBox),
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}
impl SchedulerStatus {
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
match scheduler {
Some(scheduler) => SchedulerStatus::Active(scheduler),
None => SchedulerStatus::Unavailable,
}
}
fn transition_from_stale_to_active(
&mut self,
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Active failed: {self:?}");
};
*self = Self::Active(f(pool, result_with_timings));
}
fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
unreachable!("not active: {self:?}");
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}
fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
scheduler
}
fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
result_with_timings
}
fn active_scheduler(&self) -> &InstalledSchedulerBox {
let SchedulerStatus::Active(active_scheduler) = self else {
panic!("not active: {self:?}");
};
active_scheduler
}
}
#[derive(Debug)]
pub struct BankWithScheduler {
inner: Arc<BankWithSchedulerInner>,
}
#[derive(Debug)]
pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
if let Some(bank_in_context) = scheduler
.as_ref()
.map(|scheduler| scheduler.context().bank().unwrap())
{
assert!(Arc::ptr_eq(&bank, bank_in_context));
}
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
}),
}
}
pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
Self::new(bank, None)
}
pub fn clone_with_scheduler(&self) -> BankWithScheduler {
BankWithScheduler {
inner: self.inner.clone(),
}
}
pub fn clone_without_scheduler(&self) -> Arc<Bank> {
self.inner.bank.clone()
}
pub fn register_tick(&self, hash: &Hash) {
self.inner.bank.register_tick(hash, &self.inner.scheduler);
}
#[cfg(feature = "dev-context-only-utils")]
pub fn fill_bank_with_ticks_for_tests(&self) {
self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
}
pub fn has_installed_scheduler(&self) -> bool {
!matches!(
&*self.inner.scheduler.read().unwrap(),
SchedulerStatus::Unavailable
)
}
pub fn has_installed_active_bp_scheduler(&self) -> bool {
if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
matches!(scheduler.context().mode(), SchedulingMode::BlockProduction)
} else {
false
}
}
pub fn schedule_transaction_executions(
&self,
transaction_with_task_ids: impl ExactSizeIterator<
Item = (RuntimeTransaction<SanitizedTransaction>, OrderedTaskId),
>,
) -> Result<()> {
trace!(
"schedule_transaction_executions(): {} txs",
transaction_with_task_ids.len()
);
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
for (sanitized_transaction, task_id) in transaction_with_task_ids {
scheduler.schedule_execution(sanitized_transaction, task_id)?;
}
Ok(())
});
if schedule_result.is_err() {
return Err(self.inner.retrieve_error_after_schedule_failure());
}
Ok(())
}
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
self.inner.drop_scheduler();
}
pub fn unpause_new_block_production_scheduler(&self) {
if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
scheduler.unpause_after_taken();
}
}
pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
bank,
scheduler,
WaitReason::PausedForRecentBlockhash,
);
assert!(
maybe_result_with_timings.is_none(),
"Premature result was returned from scheduler after paused (slot: {})",
bank.slot(),
);
}
#[must_use]
pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
BankWithSchedulerInner::wait_for_scheduler_termination(
&self.inner.bank,
&self.inner.scheduler,
WaitReason::TerminatedToFreeze,
)
}
pub fn ensure_return_abandoned_bp_scheduler_to_scheduler_pool(&self) {
if !self.has_installed_active_bp_scheduler() {
return;
}
if let Some((result, _timings)) = self.wait_for_completed_scheduler() {
info!(
"Reaped cleared tpu_bank and returned abandoned bp scheduler: {} {:?}",
self.slot(),
result
);
}
}
pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(SchedulerStatus::Unavailable)
}
}
impl BankWithSchedulerInner {
fn with_active_scheduler(
self: &Arc<Self>,
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
) -> ScheduleResult {
let scheduler = self.scheduler.read().unwrap();
match &*scheduler {
SchedulerStatus::Active(scheduler) => {
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
trace!(
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
self.bank.slot(),
);
Err(SchedulerAborted)
}
SchedulerStatus::Stale(pool, _result_with_timings) => {
let pool = pool.clone();
drop(scheduler);
let context = SchedulingContext::for_verification(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
trace!("with_active_scheduler: {scheduler:?}");
scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
let scheduler = pool
.take_resumed_scheduler(context, result_with_timings)
.unwrap();
info!(
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: \
{})",
self.bank.slot(),
scheduler.id(),
);
scheduler
});
drop(scheduler);
let scheduler = self.scheduler.read().unwrap();
pool.register_timeout_listener(self.do_create_timeout_listener());
f(scheduler.active_scheduler())
}
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
}
}
fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};
let Ok(mut scheduler) = bank.scheduler.write() else {
return;
};
scheduler.maybe_transition_from_active_to_stale(|scheduler| {
let id = scheduler.id();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
info!(
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
bank.bank.slot(),
id,
);
(pool, result_with_timings)
});
trace!("timeout_listener: {scheduler:?}");
})
}
fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => unreachable!("no error in {:?}", self.scheduler),
}
}
#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
&self.bank,
&self.scheduler,
WaitReason::DroppedFromBankForks,
)
}
#[must_use]
fn wait_for_scheduler_termination(
bank: &Bank,
scheduler: &InstalledSchedulerRwLock,
reason: WaitReason,
) -> Option<ResultWithTimings> {
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
bank.slot(),
reason,
thread::current(),
);
let mut scheduler = scheduler.write().unwrap();
let (was_noop, result_with_timings) = match &mut *scheduler {
SchedulerStatus::Active(scheduler) if reason.is_paused() => {
scheduler.pause_for_recent_blockhash();
(false, None)
}
SchedulerStatus::Active(_scheduler) => {
let scheduler = scheduler.transition_from_active_to_unavailable();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
(false, Some(result_with_timings))
}
SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
(true, None)
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
let result_with_timings = scheduler.transition_from_stale_to_unavailable();
(true, Some(result_with_timings))
}
SchedulerStatus::Unavailable => (true, None),
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at \
{:?}...",
bank.slot(),
reason,
was_noop,
result_with_timings.as_ref().map(|(result, _)| result),
thread::current(),
);
trace!("wait_for_scheduler_termination(result_with_timings: {result_with_timings:?})",);
result_with_timings
}
fn drop_scheduler(&self) {
if thread::panicking() {
error!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already \
panicking...",
self.bank.slot(),
);
return;
}
if let Some(Err(err)) = self
.wait_for_completed_scheduler_from_drop()
.map(|(result, _timings)| result)
{
warn!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from \
scheduler: {:?}",
self.bank.slot(),
err,
);
}
}
}
impl Drop for BankWithSchedulerInner {
fn drop(&mut self) {
self.drop_scheduler();
}
}
impl Deref for BankWithScheduler {
type Target = Arc<Bank>;
fn deref(&self) -> &Self::Target {
&self.inner.bank
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
bank::test_utils::goto_end_of_slot_with_scheduler,
genesis_utils::{GenesisConfigInfo, create_genesis_config},
},
mockall::Sequence,
solana_system_transaction as system_transaction,
std::sync::Mutex,
};
fn setup_mocked_scheduler_with_extra(
bank: Arc<Bank>,
is_dropped_flags: impl Iterator<Item = bool>,
f: Option<impl Fn(&mut MockInstalledScheduler)>,
) -> InstalledSchedulerBox {
let mut mock = MockInstalledScheduler::new();
let seq = Arc::new(Mutex::new(Sequence::new()));
mock.expect_context()
.times(1)
.in_sequence(&mut seq.lock().unwrap())
.return_const(SchedulingContext::for_verification(bank));
for wait_reason in is_dropped_flags {
let seq_cloned = seq.clone();
mock.expect_wait_for_termination()
.with(mockall::predicate::eq(wait_reason))
.times(1)
.in_sequence(&mut seq.lock().unwrap())
.returning(move |_| {
let mut mock_uninstalled = MockUninstalledScheduler::new();
mock_uninstalled
.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq_cloned.lock().unwrap())
.returning(|| ());
(
(Ok(()), ExecuteTimings::default()),
Box::new(mock_uninstalled),
)
});
}
if let Some(f) = f {
f(&mut mock);
}
Box::new(mock)
}
fn setup_mocked_scheduler(
bank: Arc<Bank>,
is_dropped_flags: impl Iterator<Item = bool>,
) -> InstalledSchedulerBox {
setup_mocked_scheduler_with_extra(
bank,
is_dropped_flags,
None::<fn(&mut MockInstalledScheduler) -> ()>,
)
}
#[test]
fn test_scheduler_normal_termination() {
agave_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler(bank, [false].into_iter())),
);
assert!(bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
assert!(!bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), None);
}
#[test]
fn test_no_scheduler_termination() {
agave_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new_without_scheduler(bank);
assert!(!bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), None);
}
#[test]
fn test_scheduler_termination_from_drop() {
agave_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler(bank, [true].into_iter())),
);
drop(bank);
}
#[test]
fn test_scheduler_pause() {
agave_logger::setup();
let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler_with_extra(
bank,
[false].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked
.expect_pause_for_recent_blockhash()
.times(1)
.returning(|| ());
}),
)),
);
goto_end_of_slot_with_scheduler(&bank);
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
}
fn do_test_schedule_execution(should_succeed: bool) {
agave_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
2,
genesis_config.hash(),
));
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mocked_scheduler = setup_mocked_scheduler_with_extra(
bank.clone(),
[true].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
if should_succeed {
mocked
.expect_schedule_execution()
.times(1)
.returning(|_, _| Ok(()));
} else {
mocked
.expect_schedule_execution()
.times(1)
.returning(|_, _| Err(SchedulerAborted));
mocked
.expect_recover_error_after_abort()
.times(1)
.returning(|| TransactionError::InsufficientFundsForFee);
}
}),
);
let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
if should_succeed {
assert_matches!(result, Ok(()));
} else {
assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
}
}
#[test]
fn test_schedule_execution_success() {
do_test_schedule_execution(true);
}
#[test]
fn test_schedule_execution_failure() {
do_test_schedule_execution(false);
}
}