1use {
24 crate::bank::Bank,
25 clone_solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
26 clone_solana_sdk::{
27 clock::Slot,
28 hash::Hash,
29 transaction::{Result, SanitizedTransaction, TransactionError},
30 },
31 clone_solana_timings::ExecuteTimings,
32 clone_solana_unified_scheduler_logic::SchedulingMode,
33 log::*,
34 std::{
35 fmt::{self, Debug},
36 mem,
37 ops::Deref,
38 sync::{Arc, RwLock},
39 thread,
40 },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46 (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50 fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53 self.take_resumed_scheduler(context, initialized_result_with_timings())
54 }
55
56 fn take_resumed_scheduler(
57 &self,
58 context: SchedulingContext,
59 result_with_timings: ResultWithTimings,
60 ) -> InstalledSchedulerBox;
61
62 fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70}
71
72#[derive(Debug)]
73pub struct SchedulerAborted;
74pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
75
76pub struct TimeoutListener {
77 callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
78}
79
80impl TimeoutListener {
81 pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
82 Self {
83 callback: Box::new(f),
84 }
85 }
86
87 pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
88 (self.callback)(pool);
89 }
90}
91
92impl Debug for TimeoutListener {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 write!(f, "TimeoutListener({self:p})")
95 }
96}
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99#[cfg_attr(feature = "dev-context-only-utils", automock)]
146#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
149pub trait InstalledScheduler: Send + Sync + Debug + 'static {
150 fn id(&self) -> SchedulerId;
151 fn context(&self) -> &SchedulingContext;
152
153 fn schedule_execution(
176 &self,
177 transaction: RuntimeTransaction<SanitizedTransaction>,
178 index: usize,
179 ) -> ScheduleResult;
180
181 fn recover_error_after_abort(&mut self) -> TransactionError;
190
191 fn wait_for_termination(
204 self: Box<Self>,
205 is_dropped: bool,
206 ) -> (ResultWithTimings, UninstalledSchedulerBox);
207
208 fn pause_for_recent_blockhash(&mut self);
215}
216
217#[cfg_attr(feature = "dev-context-only-utils", automock)]
218pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
219 fn return_to_pool(self: Box<Self>);
220}
221
222pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
223pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
224
225pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
226
227pub type SchedulerId = u64;
228
229#[derive(Clone, Debug)]
239pub struct SchedulingContext {
240 mode: SchedulingMode,
241 bank: Arc<Bank>,
242}
243
244impl SchedulingContext {
245 pub fn new(bank: Arc<Bank>) -> Self {
246 Self {
248 mode: SchedulingMode::BlockVerification,
249 bank,
250 }
251 }
252
253 pub fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
254 Self { mode, bank }
255 }
256
257 #[cfg(feature = "dev-context-only-utils")]
258 pub fn for_production(bank: Arc<Bank>) -> Self {
259 Self {
260 mode: SchedulingMode::BlockProduction,
261 bank,
262 }
263 }
264
265 pub fn mode(&self) -> SchedulingMode {
266 self.mode
267 }
268
269 pub fn bank(&self) -> &Arc<Bank> {
270 &self.bank
271 }
272
273 pub fn slot(&self) -> Slot {
274 self.bank().slot()
275 }
276}
277
278pub type ResultWithTimings = (Result<()>, ExecuteTimings);
279
280#[derive(Debug, PartialEq, Eq, Clone, Copy)]
282enum WaitReason {
283 TerminatedToFreeze,
290 DroppedFromBankForks,
293 PausedForRecentBlockhash,
297}
298
299impl WaitReason {
300 pub fn is_paused(&self) -> bool {
301 match self {
304 WaitReason::PausedForRecentBlockhash => true,
305 WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
306 }
307 }
308
309 pub fn is_dropped(&self) -> bool {
310 match self {
313 WaitReason::DroppedFromBankForks => true,
314 WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
315 }
316 }
317}
318
319#[allow(clippy::large_enum_variant)]
320#[derive(Debug)]
321pub enum SchedulerStatus {
322 Unavailable,
329 Active(InstalledSchedulerBox),
334 Stale(InstalledSchedulerPoolArc, ResultWithTimings),
341}
342
343impl SchedulerStatus {
344 fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
345 match scheduler {
346 Some(scheduler) => SchedulerStatus::Active(scheduler),
347 None => SchedulerStatus::Unavailable,
348 }
349 }
350
351 fn transition_from_stale_to_active(
352 &mut self,
353 f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
354 ) {
355 let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
356 panic!("transition to Active failed: {self:?}");
357 };
358 *self = Self::Active(f(pool, result_with_timings));
359 }
360
361 fn maybe_transition_from_active_to_stale(
362 &mut self,
363 f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
364 ) {
365 if !matches!(self, Self::Active(_scheduler)) {
366 return;
367 }
368 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
369 unreachable!("not active: {self:?}");
370 };
371 let (pool, result_with_timings) = f(scheduler);
372 *self = Self::Stale(pool, result_with_timings);
373 }
374
375 fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
376 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
377 panic!("transition to Unavailable failed: {self:?}");
378 };
379 scheduler
380 }
381
382 fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
383 let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
384 panic!("transition to Unavailable failed: {self:?}");
385 };
386 result_with_timings
387 }
388
389 fn active_scheduler(&self) -> &InstalledSchedulerBox {
390 let SchedulerStatus::Active(active_scheduler) = self else {
391 panic!("not active: {self:?}");
392 };
393 active_scheduler
394 }
395}
396
397#[derive(Debug)]
415pub struct BankWithScheduler {
416 inner: Arc<BankWithSchedulerInner>,
417}
418
419#[derive(Debug)]
420pub struct BankWithSchedulerInner {
421 bank: Arc<Bank>,
422 scheduler: InstalledSchedulerRwLock,
423}
424pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
425
426impl BankWithScheduler {
427 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
428 pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
429 if let Some(bank_in_context) = scheduler
430 .as_ref()
431 .map(|scheduler| scheduler.context().bank())
432 {
433 assert!(Arc::ptr_eq(&bank, bank_in_context));
434 }
435
436 Self {
437 inner: Arc::new(BankWithSchedulerInner {
438 bank,
439 scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
440 }),
441 }
442 }
443
444 pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
445 Self::new(bank, None)
446 }
447
448 pub fn clone_with_scheduler(&self) -> BankWithScheduler {
449 BankWithScheduler {
450 inner: self.inner.clone(),
451 }
452 }
453
454 pub fn clone_without_scheduler(&self) -> Arc<Bank> {
455 self.inner.bank.clone()
456 }
457
458 pub fn register_tick(&self, hash: &Hash) {
459 self.inner.bank.register_tick(hash, &self.inner.scheduler);
460 }
461
462 #[cfg(feature = "dev-context-only-utils")]
463 pub fn fill_bank_with_ticks_for_tests(&self) {
464 self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
465 }
466
467 pub fn has_installed_scheduler(&self) -> bool {
468 !matches!(
469 &*self.inner.scheduler.read().unwrap(),
470 SchedulerStatus::Unavailable
471 )
472 }
473
474 pub fn schedule_transaction_executions(
482 &self,
483 transactions_with_indexes: impl ExactSizeIterator<
484 Item = (RuntimeTransaction<SanitizedTransaction>, usize),
485 >,
486 ) -> Result<()> {
487 trace!(
488 "schedule_transaction_executions(): {} txs",
489 transactions_with_indexes.len()
490 );
491
492 let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
493 for (sanitized_transaction, index) in transactions_with_indexes {
494 scheduler.schedule_execution(sanitized_transaction, index)?;
495 }
496 Ok(())
497 });
498
499 if schedule_result.is_err() {
500 return Err(self.inner.retrieve_error_after_schedule_failure());
508 }
509
510 Ok(())
511 }
512
513 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
514 pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
515 self.inner.do_create_timeout_listener()
516 }
517
518 #[cfg(feature = "dev-context-only-utils")]
520 pub fn drop_scheduler(&mut self) {
521 self.inner.drop_scheduler();
522 }
523
524 pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
525 let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
526 bank,
527 scheduler,
528 WaitReason::PausedForRecentBlockhash,
529 );
530 assert!(
531 maybe_result_with_timings.is_none(),
532 "Premature result was returned from scheduler after paused (slot: {})",
533 bank.slot(),
534 );
535 }
536
537 #[must_use]
538 pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
539 BankWithSchedulerInner::wait_for_scheduler_termination(
540 &self.inner.bank,
541 &self.inner.scheduler,
542 WaitReason::TerminatedToFreeze,
543 )
544 }
545
546 pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
547 RwLock::new(SchedulerStatus::Unavailable)
548 }
549}
550
551impl BankWithSchedulerInner {
552 fn with_active_scheduler(
553 self: &Arc<Self>,
554 f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
555 ) -> ScheduleResult {
556 let scheduler = self.scheduler.read().unwrap();
557 match &*scheduler {
558 SchedulerStatus::Active(scheduler) => {
559 f(scheduler)
561 }
562 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
563 trace!(
564 "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
565 self.bank.slot(),
566 );
567 Err(SchedulerAborted)
568 }
569 SchedulerStatus::Stale(pool, _result_with_timings) => {
570 let pool = pool.clone();
571 drop(scheduler);
572
573 let context = SchedulingContext::new(self.bank.clone());
574 let mut scheduler = self.scheduler.write().unwrap();
575 trace!("with_active_scheduler: {:?}", scheduler);
576 scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
577 let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
578 info!(
579 "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
580 self.bank.slot(),
581 scheduler.id(),
582 );
583 scheduler
584 });
585 drop(scheduler);
586
587 let scheduler = self.scheduler.read().unwrap();
588 pool.register_timeout_listener(self.do_create_timeout_listener());
593 f(scheduler.active_scheduler())
594 }
595 SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
596 }
597 }
598
599 fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
600 let weak_bank = Arc::downgrade(self);
601 TimeoutListener::new(move |pool| {
602 let Some(bank) = weak_bank.upgrade() else {
603 return;
607 };
608
609 let Ok(mut scheduler) = bank.scheduler.write() else {
610 return;
612 };
613
614 scheduler.maybe_transition_from_active_to_stale(|scheduler| {
619 let id = scheduler.id();
625 let (result_with_timings, uninstalled_scheduler) =
626 scheduler.wait_for_termination(false);
627 uninstalled_scheduler.return_to_pool();
628 info!(
629 "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
630 bank.bank.slot(),
631 id,
632 );
633 (pool, result_with_timings)
634 });
635 trace!("timeout_listener: {:?}", scheduler);
636 })
637 }
638
639 fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
642 let mut scheduler = self.scheduler.write().unwrap();
643 match &mut *scheduler {
644 SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
645 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
646 result.clone().unwrap_err()
647 }
648 _ => unreachable!("no error in {:?}", self.scheduler),
649 }
650 }
651
652 #[must_use]
653 fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
654 Self::wait_for_scheduler_termination(
655 &self.bank,
656 &self.scheduler,
657 WaitReason::DroppedFromBankForks,
658 )
659 }
660
661 #[must_use]
662 fn wait_for_scheduler_termination(
663 bank: &Bank,
664 scheduler: &InstalledSchedulerRwLock,
665 reason: WaitReason,
666 ) -> Option<ResultWithTimings> {
667 debug!(
668 "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
669 bank.slot(),
670 reason,
671 thread::current(),
672 );
673
674 let mut scheduler = scheduler.write().unwrap();
675 let (was_noop, result_with_timings) = match &mut *scheduler {
676 SchedulerStatus::Active(scheduler) if reason.is_paused() => {
677 scheduler.pause_for_recent_blockhash();
678 (false, None)
679 }
680 SchedulerStatus::Active(_scheduler) => {
681 let scheduler = scheduler.transition_from_active_to_unavailable();
682 let (result_with_timings, uninstalled_scheduler) =
683 scheduler.wait_for_termination(reason.is_dropped());
684 uninstalled_scheduler.return_to_pool();
685 (false, Some(result_with_timings))
686 }
687 SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
688 (true, None)
691 }
692 SchedulerStatus::Stale(_pool, _result_with_timings) => {
693 let result_with_timings = scheduler.transition_from_stale_to_unavailable();
694 (true, Some(result_with_timings))
695 }
696 SchedulerStatus::Unavailable => (true, None),
697 };
698 debug!(
699 "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
700 bank.slot(),
701 reason,
702 was_noop,
703 result_with_timings.as_ref().map(|(result, _)| result),
704 thread::current(),
705 );
706 trace!(
707 "wait_for_scheduler_termination(result_with_timings: {:?})",
708 result_with_timings,
709 );
710
711 result_with_timings
712 }
713
714 fn drop_scheduler(&self) {
715 if thread::panicking() {
716 error!(
717 "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
718 self.bank.slot(),
719 );
720 return;
721 }
722
723 if let Some(Err(err)) = self
725 .wait_for_completed_scheduler_from_drop()
726 .map(|(result, _timings)| result)
727 {
728 warn!(
729 "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
730 self.bank.slot(),
731 err,
732 );
733 }
734 }
735}
736
737impl Drop for BankWithSchedulerInner {
738 fn drop(&mut self) {
739 self.drop_scheduler();
740 }
741}
742
743impl Deref for BankWithScheduler {
744 type Target = Arc<Bank>;
745
746 fn deref(&self) -> &Self::Target {
747 &self.inner.bank
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use {
754 super::*,
755 crate::{
756 bank::test_utils::goto_end_of_slot_with_scheduler,
757 genesis_utils::{create_genesis_config, GenesisConfigInfo},
758 },
759 assert_matches::assert_matches,
760 clone_solana_sdk::system_transaction,
761 mockall::Sequence,
762 std::sync::Mutex,
763 };
764
765 fn setup_mocked_scheduler_with_extra(
766 bank: Arc<Bank>,
767 is_dropped_flags: impl Iterator<Item = bool>,
768 f: Option<impl Fn(&mut MockInstalledScheduler)>,
769 ) -> InstalledSchedulerBox {
770 let mut mock = MockInstalledScheduler::new();
771 let seq = Arc::new(Mutex::new(Sequence::new()));
772
773 mock.expect_context()
774 .times(1)
775 .in_sequence(&mut seq.lock().unwrap())
776 .return_const(SchedulingContext::new(bank));
777
778 for wait_reason in is_dropped_flags {
779 let seq_cloned = seq.clone();
780 mock.expect_wait_for_termination()
781 .with(mockall::predicate::eq(wait_reason))
782 .times(1)
783 .in_sequence(&mut seq.lock().unwrap())
784 .returning(move |_| {
785 let mut mock_uninstalled = MockUninstalledScheduler::new();
786 mock_uninstalled
787 .expect_return_to_pool()
788 .times(1)
789 .in_sequence(&mut seq_cloned.lock().unwrap())
790 .returning(|| ());
791 (
792 (Ok(()), ExecuteTimings::default()),
793 Box::new(mock_uninstalled),
794 )
795 });
796 }
797
798 if let Some(f) = f {
799 f(&mut mock);
800 }
801
802 Box::new(mock)
803 }
804
805 fn setup_mocked_scheduler(
806 bank: Arc<Bank>,
807 is_dropped_flags: impl Iterator<Item = bool>,
808 ) -> InstalledSchedulerBox {
809 setup_mocked_scheduler_with_extra(
810 bank,
811 is_dropped_flags,
812 None::<fn(&mut MockInstalledScheduler) -> ()>,
813 )
814 }
815
816 #[test]
817 fn test_scheduler_normal_termination() {
818 clone_solana_logger::setup();
819
820 let bank = Arc::new(Bank::default_for_tests());
821 let bank = BankWithScheduler::new(
822 bank.clone(),
823 Some(setup_mocked_scheduler(bank, [false].into_iter())),
824 );
825 assert!(bank.has_installed_scheduler());
826 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
827
828 assert!(!bank.has_installed_scheduler());
831 assert_matches!(bank.wait_for_completed_scheduler(), None);
832 }
833
834 #[test]
835 fn test_no_scheduler_termination() {
836 clone_solana_logger::setup();
837
838 let bank = Arc::new(Bank::default_for_tests());
839 let bank = BankWithScheduler::new_without_scheduler(bank);
840
841 assert!(!bank.has_installed_scheduler());
843 assert_matches!(bank.wait_for_completed_scheduler(), None);
844 }
845
846 #[test]
847 fn test_scheduler_termination_from_drop() {
848 clone_solana_logger::setup();
849
850 let bank = Arc::new(Bank::default_for_tests());
851 let bank = BankWithScheduler::new(
852 bank.clone(),
853 Some(setup_mocked_scheduler(bank, [true].into_iter())),
854 );
855 drop(bank);
856 }
857
858 #[test]
859 fn test_scheduler_pause() {
860 clone_solana_logger::setup();
861
862 let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
863 let bank = BankWithScheduler::new(
864 bank.clone(),
865 Some(setup_mocked_scheduler_with_extra(
866 bank,
867 [false].into_iter(),
868 Some(|mocked: &mut MockInstalledScheduler| {
869 mocked
870 .expect_pause_for_recent_blockhash()
871 .times(1)
872 .returning(|| ());
873 }),
874 )),
875 );
876 goto_end_of_slot_with_scheduler(&bank);
877 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
878 }
879
880 fn do_test_schedule_execution(should_succeed: bool) {
881 clone_solana_logger::setup();
882
883 let GenesisConfigInfo {
884 genesis_config,
885 mint_keypair,
886 ..
887 } = create_genesis_config(10_000);
888 let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
889 &mint_keypair,
890 &clone_solana_pubkey::new_rand(),
891 2,
892 genesis_config.hash(),
893 ));
894 let bank = Arc::new(Bank::new_for_tests(&genesis_config));
895 let mocked_scheduler = setup_mocked_scheduler_with_extra(
896 bank.clone(),
897 [true].into_iter(),
898 Some(|mocked: &mut MockInstalledScheduler| {
899 if should_succeed {
900 mocked
901 .expect_schedule_execution()
902 .times(1)
903 .returning(|_, _| Ok(()));
904 } else {
905 mocked
906 .expect_schedule_execution()
907 .times(1)
908 .returning(|_, _| Err(SchedulerAborted));
909 mocked
910 .expect_recover_error_after_abort()
911 .times(1)
912 .returning(|| TransactionError::InsufficientFundsForFee);
913 }
914 }),
915 );
916
917 let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
918 let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
919 if should_succeed {
920 assert_matches!(result, Ok(()));
921 } else {
922 assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
923 }
924 }
925
926 #[test]
927 fn test_schedule_execution_success() {
928 do_test_schedule_execution(true);
929 }
930
931 #[test]
932 fn test_schedule_execution_failure() {
933 do_test_schedule_execution(false);
934 }
935}