solana_runtime/
installed_scheduler_pool.rs

1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
18//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24    crate::bank::Bank,
25    assert_matches::assert_matches,
26    log::*,
27    solana_clock::Slot,
28    solana_hash::Hash,
29    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
30    solana_timings::ExecuteTimings,
31    solana_transaction::sanitized::SanitizedTransaction,
32    solana_transaction_error::{TransactionError, TransactionResult as Result},
33    solana_unified_scheduler_logic::SchedulingMode,
34    std::{
35        fmt::{self, Debug},
36        mem,
37        ops::Deref,
38        sync::{Arc, RwLock},
39        thread,
40    },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46    (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50    /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51    /// for a brand-new bank.
52    fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53        self.take_resumed_scheduler(context, initialized_result_with_timings())
54    }
55
56    fn take_resumed_scheduler(
57        &self,
58        context: SchedulingContext,
59        result_with_timings: ResultWithTimings,
60    ) -> InstalledSchedulerBox;
61
62    /// Registers an opaque timeout listener.
63    ///
64    /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65    /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66    /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67    /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68    /// relying on eventual stale listener clean-up by `solScCleaner`.
69    fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70}
71
72#[derive(Debug)]
73pub struct SchedulerAborted;
74pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
75
76pub struct TimeoutListener {
77    callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
78}
79
80impl TimeoutListener {
81    pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
82        Self {
83            callback: Box::new(f),
84        }
85    }
86
87    pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
88        (self.callback)(pool);
89    }
90}
91
92impl Debug for TimeoutListener {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        write!(f, "TimeoutListener({self:p})")
95    }
96}
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99/// Schedules, executes, and commits transactions under encapsulated implementation
100///
101/// The following chart illustrates the ownership/reference interaction between inter-dependent
102/// objects across crates:
103///
104/// ```mermaid
105/// graph TD
106///     Bank["Arc#lt;Bank#gt;"]
107///
108///     subgraph solana-runtime[<span style="font-size: 70%">solana-runtime</span>]
109///         BankForks;
110///         BankWithScheduler;
111///         Bank;
112///         LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
113///         SchedulingContext;
114///         InstalledSchedulerPool{{InstalledSchedulerPool}};
115///         InstalledScheduler{{InstalledScheduler}};
116///     end
117///
118///     subgraph solana-unified-scheduler-pool[<span style="font-size: 70%">solana-unified-scheduler-pool</span>]
119///         SchedulerPool;
120///         PooledScheduler;
121///         ScheduleExecution(["schedule_execution()"]);
122///     end
123///
124///     subgraph solana-ledger[<span style="font-size: 60%">solana-ledger</span>]
125///         ExecuteBatch(["execute_batch()"]);
126///     end
127///
128///     ScheduleExecution -. calls .-> ExecuteBatch;
129///     BankWithScheduler -. dyn-calls .-> ScheduleExecution;
130///     ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
131///     linkStyle 0,1,2 stroke:gray,color:gray;
132///
133///     BankForks -- owns --> BankWithScheduler;
134///     BankForks -- owns --> InstalledSchedulerPool;
135///     BankWithScheduler -- refs --> Bank;
136///     BankWithScheduler -- owns --> InstalledScheduler;
137///     SchedulingContext -- refs --> Bank;
138///     InstalledScheduler -- owns --> SchedulingContext;
139///
140///     SchedulerPool -- owns --> PooledScheduler;
141///     SchedulerPool -. impls .-> InstalledSchedulerPool;
142///     PooledScheduler -. impls .-> InstalledScheduler;
143///     PooledScheduler -- refs --> SchedulerPool;
144/// ```
145#[cfg_attr(feature = "dev-context-only-utils", automock)]
146// suppress false clippy complaints arising from mockall-derive:
147//   warning: `#[must_use]` has no effect when applied to a struct field
148#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
149pub trait InstalledScheduler: Send + Sync + Debug + 'static {
150    fn id(&self) -> SchedulerId;
151    fn context(&self) -> &SchedulingContext;
152
153    /// Schedule transaction for execution.
154    ///
155    /// This non-blocking function will return immediately without waiting for actual execution.
156    ///
157    /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
158    /// fatal logic error.
159    ///
160    /// Note that the returned result indicates whether the scheduler has been aborted due to a
161    /// previously-scheduled bad transaction, which terminates further block verification. So,
162    /// almost always, the returned error isn't due to the merely scheduling of the current
163    /// transaction itself. At this point, calling this does nothing anymore while it's still safe
164    /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
165    /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
166    /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
167    /// not-aborted schedulers.
168    ///
169    /// Caller can acquire the error by calling a separate function called
170    /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
171    /// separation and the convoluted returned value semantics explained above are intentional to
172    /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
173    /// cost of far slower error code-path while giving implementors increased flexibility by
174    /// having &mut.
175    fn schedule_execution(
176        &self,
177        transaction: RuntimeTransaction<SanitizedTransaction>,
178        index: usize,
179    ) -> ScheduleResult;
180
181    /// Return the error which caused the scheduler to abort.
182    ///
183    /// Note that this must not be called until it's observed that `schedule_execution()` has
184    /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
185    ///
186    /// That said, calling this multiple times is completely acceptable after the error observation
187    /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
188    /// the first bad transaction are usually returned across invocations.
189    fn recover_error_after_abort(&mut self) -> TransactionError;
190
191    /// Wait for a scheduler to terminate after processing.
192    ///
193    /// This function blocks the current thread while waiting for the scheduler to complete all of
194    /// the executions for the scheduled transactions and to return the finalized
195    /// `ResultWithTimings`. This function still blocks for short period of time even in the case
196    /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
197    ///
198    /// Along with the result being returned, this function also makes the scheduler itself
199    /// uninstalled from the bank by transforming the consumed self.
200    ///
201    /// If no transaction is scheduled, the result and timing will be `Ok(())` and
202    /// `ExecuteTimings::default()` respectively.
203    fn wait_for_termination(
204        self: Box<Self>,
205        is_dropped: bool,
206    ) -> (ResultWithTimings, UninstalledSchedulerBox);
207
208    /// Pause a scheduler after processing to update bank's recent blockhash.
209    ///
210    /// This function blocks the current thread like wait_for_termination(). However, the scheduler
211    /// won't be consumed. This means the scheduler is responsible to retain the finalized
212    /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
213    /// later.
214    fn pause_for_recent_blockhash(&mut self);
215
216    /// Unpause a block production scheduler, immediately after it's taken from the scheduler pool.
217    ///
218    /// This is rather a special-purposed method. Such a scheduler is initially paused due to a
219    /// race condition between the poh thread and handler threads. So, it needs to be unpaused in
220    /// order to start processing transactions by calling this.
221    ///
222    /// # Panics
223    ///
224    /// Panics if called on a block verification scheduler.
225    fn unpause_after_taken(&self);
226}
227
228#[cfg_attr(feature = "dev-context-only-utils", automock)]
229pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
230    fn return_to_pool(self: Box<Self>);
231}
232
233pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
234pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
235
236pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
237
238pub type SchedulerId = u64;
239
240/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
241///
242/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
243/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
244/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
245/// execute all transactions for a given bank for block verification or production. A context is
246/// expected to be used by a particular scheduler only for that duration of the time and to be
247/// disposed by the scheduler. Then, the scheduler may work on different banks with new
248/// `SchedulingContext`s.
249///
250/// There's a special construction only used for scheduler preallocation, which has no bank. Panics
251/// will be triggered when tried to be used normally across code-base.
252#[derive(Clone, Debug)]
253pub struct SchedulingContext {
254    mode: SchedulingMode,
255    bank: Option<Arc<Bank>>,
256}
257
258impl SchedulingContext {
259    pub fn for_preallocation() -> Self {
260        Self {
261            mode: SchedulingMode::BlockProduction,
262            bank: None,
263        }
264    }
265
266    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
267    pub(crate) fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
268        Self {
269            mode,
270            bank: Some(bank),
271        }
272    }
273
274    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
275    fn for_verification(bank: Arc<Bank>) -> Self {
276        Self::new_with_mode(SchedulingMode::BlockVerification, bank)
277    }
278
279    #[cfg(feature = "dev-context-only-utils")]
280    pub fn for_production(bank: Arc<Bank>) -> Self {
281        Self::new_with_mode(SchedulingMode::BlockProduction, bank)
282    }
283
284    pub fn is_preallocated(&self) -> bool {
285        self.bank.is_none()
286    }
287
288    pub fn mode(&self) -> SchedulingMode {
289        self.mode
290    }
291
292    pub fn bank(&self) -> Option<&Arc<Bank>> {
293        self.bank.as_ref()
294    }
295
296    pub fn slot(&self) -> Option<Slot> {
297        self.bank.as_ref().map(|bank| bank.slot())
298    }
299}
300
301pub type ResultWithTimings = (Result<()>, ExecuteTimings);
302
303/// A hint from the bank about the reason the caller is waiting on its scheduler.
304#[derive(Debug, PartialEq, Eq, Clone, Copy)]
305enum WaitReason {
306    // The bank wants its scheduler to terminate after the completion of transaction execution, in
307    // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
308    //
309    // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
310    // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
311    // infallible.
312    TerminatedToFreeze,
313    // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
314    // Drop::drop() is the caller.
315    DroppedFromBankForks,
316    // The bank wants its scheduler to pause after the completion without being returned to the
317    // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
318    // `ResultWithTimings` later.
319    PausedForRecentBlockhash,
320}
321
322impl WaitReason {
323    pub fn is_paused(&self) -> bool {
324        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
325        // decision to be made, should we add new variants like `PausedForFooBar`...
326        match self {
327            WaitReason::PausedForRecentBlockhash => true,
328            WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
329        }
330    }
331
332    pub fn is_dropped(&self) -> bool {
333        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
334        // decision to be made, should we add new variants like `PausedForFooBar`...
335        match self {
336            WaitReason::DroppedFromBankForks => true,
337            WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
338        }
339    }
340}
341
342#[allow(clippy::large_enum_variant)]
343#[derive(Debug)]
344pub enum SchedulerStatus {
345    /// Unified scheduler is disabled or installed scheduler is consumed by
346    /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
347    /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
348    /// <=> [`Self::Stale`] below.  Also, this variant is transiently used as a placeholder
349    /// internally when transitioning scheduler statuses, which isn't observable unless panic is
350    /// happening.
351    Unavailable,
352    /// Scheduler is installed into a bank; could be running or just be waiting for additional
353    /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
354    /// `solana_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
355    /// been frozen since installed.
356    Active(InstalledSchedulerBox),
357    /// Scheduler has yet to freeze its associated bank even after it's taken too long since
358    /// installed, resulting in returning the scheduler back to the pool. Later, this can
359    /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
360    /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
361    /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
362    /// the transition happen).
363    Stale(InstalledSchedulerPoolArc, ResultWithTimings),
364}
365
366impl SchedulerStatus {
367    fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
368        match scheduler {
369            Some(scheduler) => SchedulerStatus::Active(scheduler),
370            None => SchedulerStatus::Unavailable,
371        }
372    }
373
374    fn transition_from_stale_to_active(
375        &mut self,
376        f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
377    ) {
378        let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
379            panic!("transition to Active failed: {self:?}");
380        };
381        *self = Self::Active(f(pool, result_with_timings));
382    }
383
384    fn maybe_transition_from_active_to_stale(
385        &mut self,
386        f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
387    ) {
388        if !matches!(self, Self::Active(_scheduler)) {
389            return;
390        }
391        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
392            unreachable!("not active: {self:?}");
393        };
394        let (pool, result_with_timings) = f(scheduler);
395        *self = Self::Stale(pool, result_with_timings);
396    }
397
398    fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
399        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
400            panic!("transition to Unavailable failed: {self:?}");
401        };
402        scheduler
403    }
404
405    fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
406        let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
407            panic!("transition to Unavailable failed: {self:?}");
408        };
409        result_with_timings
410    }
411
412    fn active_scheduler(&self) -> &InstalledSchedulerBox {
413        let SchedulerStatus::Active(active_scheduler) = self else {
414            panic!("not active: {self:?}");
415        };
416        active_scheduler
417    }
418}
419
420/// Very thin wrapper around Arc<Bank>
421///
422/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
423/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
424/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
425/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
426///
427/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
428/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
429/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
430/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
431/// used for `BankWithScheduler` across codebase.
432///
433/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
434/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
435/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
436/// unusual outside scheduler code-path)
437#[derive(Debug)]
438pub struct BankWithScheduler {
439    inner: Arc<BankWithSchedulerInner>,
440}
441
442#[derive(Debug)]
443pub struct BankWithSchedulerInner {
444    bank: Arc<Bank>,
445    scheduler: InstalledSchedulerRwLock,
446}
447pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
448
449impl BankWithScheduler {
450    /// Creates a new `BankWithScheduler` from bank and its associated scheduler.
451    ///
452    /// # Panics
453    ///
454    /// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler
455    /// preallocation.
456    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
457    pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
458        // Avoid the fatal situation in which bank is being associated with a scheduler associated
459        // to a different bank!
460        if let Some(bank_in_context) = scheduler
461            .as_ref()
462            .map(|scheduler| scheduler.context().bank().unwrap())
463        {
464            assert!(Arc::ptr_eq(&bank, bank_in_context));
465        }
466
467        Self {
468            inner: Arc::new(BankWithSchedulerInner {
469                bank,
470                scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
471            }),
472        }
473    }
474
475    pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
476        Self::new(bank, None)
477    }
478
479    pub fn clone_with_scheduler(&self) -> BankWithScheduler {
480        BankWithScheduler {
481            inner: self.inner.clone(),
482        }
483    }
484
485    pub fn clone_without_scheduler(&self) -> Arc<Bank> {
486        self.inner.bank.clone()
487    }
488
489    pub fn register_tick(&self, hash: &Hash) {
490        self.inner.bank.register_tick(hash, &self.inner.scheduler);
491    }
492
493    #[cfg(feature = "dev-context-only-utils")]
494    pub fn fill_bank_with_ticks_for_tests(&self) {
495        self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
496    }
497
498    pub fn has_installed_scheduler(&self) -> bool {
499        !matches!(
500            &*self.inner.scheduler.read().unwrap(),
501            SchedulerStatus::Unavailable
502        )
503    }
504
505    /// Schedule the transaction as long as the scheduler hasn't been aborted.
506    ///
507    /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
508    /// return the error of prior scheduled transaction.
509    ///
510    /// Calling this will panic if the installed scheduler is Unavailable (the bank is
511    /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
512    pub fn schedule_transaction_executions(
513        &self,
514        transactions_with_indexes: impl ExactSizeIterator<
515            Item = (RuntimeTransaction<SanitizedTransaction>, usize),
516        >,
517    ) -> Result<()> {
518        trace!(
519            "schedule_transaction_executions(): {} txs",
520            transactions_with_indexes.len()
521        );
522
523        let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
524            for (sanitized_transaction, index) in transactions_with_indexes {
525                scheduler.schedule_execution(sanitized_transaction, index)?;
526            }
527            Ok(())
528        });
529
530        if schedule_result.is_err() {
531            // This write lock isn't atomic with the above the read lock. So, another thread
532            // could have called .recover_error_after_abort() while we're literally stuck at
533            // the gaps of these locks (i.e. this comment in source code wise) under extreme
534            // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
535            // consideration in mind.
536            //
537            // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
538            return Err(self.inner.retrieve_error_after_schedule_failure());
539        }
540
541        Ok(())
542    }
543
544    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
545    pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
546        self.inner.do_create_timeout_listener()
547    }
548
549    // take needless &mut only to communicate its semantic mutability to humans...
550    #[cfg(feature = "dev-context-only-utils")]
551    pub fn drop_scheduler(&mut self) {
552        self.inner.drop_scheduler();
553    }
554
555    pub fn unpause_new_block_production_scheduler(&self) {
556        if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
557            assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
558            scheduler.unpause_after_taken();
559        }
560    }
561
562    pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
563        let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
564            bank,
565            scheduler,
566            WaitReason::PausedForRecentBlockhash,
567        );
568        assert!(
569            maybe_result_with_timings.is_none(),
570            "Premature result was returned from scheduler after paused (slot: {})",
571            bank.slot(),
572        );
573    }
574
575    #[must_use]
576    pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
577        BankWithSchedulerInner::wait_for_scheduler_termination(
578            &self.inner.bank,
579            &self.inner.scheduler,
580            WaitReason::TerminatedToFreeze,
581        )
582    }
583
584    pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
585        RwLock::new(SchedulerStatus::Unavailable)
586    }
587}
588
589impl BankWithSchedulerInner {
590    fn with_active_scheduler(
591        self: &Arc<Self>,
592        f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
593    ) -> ScheduleResult {
594        let scheduler = self.scheduler.read().unwrap();
595        match &*scheduler {
596            SchedulerStatus::Active(scheduler) => {
597                // This is the fast path, needing single read-lock most of time.
598                f(scheduler)
599            }
600            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
601                trace!(
602                    "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
603                    self.bank.slot(),
604                );
605                Err(SchedulerAborted)
606            }
607            SchedulerStatus::Stale(pool, _result_with_timings) => {
608                let pool = pool.clone();
609                drop(scheduler);
610
611                // Schedulers can be stale only if its mode is block-verification. So,
612                // unconditional context construction for verification is okay here.
613                let context = SchedulingContext::for_verification(self.bank.clone());
614                let mut scheduler = self.scheduler.write().unwrap();
615                trace!("with_active_scheduler: {:?}", scheduler);
616                scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
617                    let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
618                    info!(
619                        "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
620                        self.bank.slot(),
621                        scheduler.id(),
622                    );
623                    scheduler
624                });
625                drop(scheduler);
626
627                let scheduler = self.scheduler.read().unwrap();
628                // Re-register a new timeout listener only after acquiring the read lock;
629                // Otherwise, the listener would again put scheduler into Stale before the read
630                // lock under an extremely-rare race condition, causing panic below in
631                // active_scheduler().
632                pool.register_timeout_listener(self.do_create_timeout_listener());
633                f(scheduler.active_scheduler())
634            }
635            SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
636        }
637    }
638
639    fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
640        let weak_bank = Arc::downgrade(self);
641        TimeoutListener::new(move |pool| {
642            let Some(bank) = weak_bank.upgrade() else {
643                // BankWithSchedulerInner is already dropped, indicating successful and timely
644                // `wait_for_termination()` on the bank prior to this triggering of the timeout,
645                // rendering this callback invocation no-op.
646                return;
647            };
648
649            let Ok(mut scheduler) = bank.scheduler.write() else {
650                // BankWithScheduler's lock is poisoned...
651                return;
652            };
653
654            // Reaching here means that it's been awhile since this active scheduler is taken from
655            // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
656            // thread creation under forky condition, return the scheduler for now, even if the
657            // bank could process more transactions later.
658            scheduler.maybe_transition_from_active_to_stale(|scheduler| {
659                // Return the installed scheduler back to the scheduler pool as soon as the
660                // scheduler indicates the completion of all currently-scheduled transaction
661                // executions by `solana_unified_scheduler_pool::ThreadManager::end_session()`
662                // internally.
663
664                let id = scheduler.id();
665                let (result_with_timings, uninstalled_scheduler) =
666                    scheduler.wait_for_termination(false);
667                uninstalled_scheduler.return_to_pool();
668                info!(
669                    "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
670                    bank.bank.slot(),
671                    id,
672                );
673                (pool, result_with_timings)
674            });
675            trace!("timeout_listener: {:?}", scheduler);
676        })
677    }
678
679    /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
680    /// `panic!()`.
681    fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
682        let mut scheduler = self.scheduler.write().unwrap();
683        match &mut *scheduler {
684            SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
685            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
686                result.clone().unwrap_err()
687            }
688            _ => unreachable!("no error in {:?}", self.scheduler),
689        }
690    }
691
692    #[must_use]
693    fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
694        Self::wait_for_scheduler_termination(
695            &self.bank,
696            &self.scheduler,
697            WaitReason::DroppedFromBankForks,
698        )
699    }
700
701    #[must_use]
702    fn wait_for_scheduler_termination(
703        bank: &Bank,
704        scheduler: &InstalledSchedulerRwLock,
705        reason: WaitReason,
706    ) -> Option<ResultWithTimings> {
707        debug!(
708            "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
709            bank.slot(),
710            reason,
711            thread::current(),
712        );
713
714        let mut scheduler = scheduler.write().unwrap();
715        let (was_noop, result_with_timings) = match &mut *scheduler {
716            SchedulerStatus::Active(scheduler) if reason.is_paused() => {
717                scheduler.pause_for_recent_blockhash();
718                (false, None)
719            }
720            SchedulerStatus::Active(_scheduler) => {
721                let scheduler = scheduler.transition_from_active_to_unavailable();
722                let (result_with_timings, uninstalled_scheduler) =
723                    scheduler.wait_for_termination(reason.is_dropped());
724                uninstalled_scheduler.return_to_pool();
725                (false, Some(result_with_timings))
726            }
727            SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
728                // Do nothing for pauses because the scheduler termination is guaranteed to be
729                // called later.
730                (true, None)
731            }
732            SchedulerStatus::Stale(_pool, _result_with_timings) => {
733                let result_with_timings = scheduler.transition_from_stale_to_unavailable();
734                (true, Some(result_with_timings))
735            }
736            SchedulerStatus::Unavailable => (true, None),
737        };
738        debug!(
739            "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
740            bank.slot(),
741            reason,
742            was_noop,
743            result_with_timings.as_ref().map(|(result, _)| result),
744            thread::current(),
745        );
746        trace!(
747            "wait_for_scheduler_termination(result_with_timings: {:?})",
748            result_with_timings,
749        );
750
751        result_with_timings
752    }
753
754    fn drop_scheduler(&self) {
755        if thread::panicking() {
756            error!(
757                "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
758                self.bank.slot(),
759            );
760            return;
761        }
762
763        // There's no guarantee ResultWithTimings is available or not at all when being dropped.
764        if let Some(Err(err)) = self
765            .wait_for_completed_scheduler_from_drop()
766            .map(|(result, _timings)| result)
767        {
768            warn!(
769                "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
770                self.bank.slot(),
771                err,
772            );
773        }
774    }
775}
776
777impl Drop for BankWithSchedulerInner {
778    fn drop(&mut self) {
779        self.drop_scheduler();
780    }
781}
782
783impl Deref for BankWithScheduler {
784    type Target = Arc<Bank>;
785
786    fn deref(&self) -> &Self::Target {
787        &self.inner.bank
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use {
794        super::*,
795        crate::{
796            bank::test_utils::goto_end_of_slot_with_scheduler,
797            genesis_utils::{create_genesis_config, GenesisConfigInfo},
798        },
799        mockall::Sequence,
800        solana_system_transaction as system_transaction,
801        std::sync::Mutex,
802    };
803
804    fn setup_mocked_scheduler_with_extra(
805        bank: Arc<Bank>,
806        is_dropped_flags: impl Iterator<Item = bool>,
807        f: Option<impl Fn(&mut MockInstalledScheduler)>,
808    ) -> InstalledSchedulerBox {
809        let mut mock = MockInstalledScheduler::new();
810        let seq = Arc::new(Mutex::new(Sequence::new()));
811
812        mock.expect_context()
813            .times(1)
814            .in_sequence(&mut seq.lock().unwrap())
815            .return_const(SchedulingContext::for_verification(bank));
816
817        for wait_reason in is_dropped_flags {
818            let seq_cloned = seq.clone();
819            mock.expect_wait_for_termination()
820                .with(mockall::predicate::eq(wait_reason))
821                .times(1)
822                .in_sequence(&mut seq.lock().unwrap())
823                .returning(move |_| {
824                    let mut mock_uninstalled = MockUninstalledScheduler::new();
825                    mock_uninstalled
826                        .expect_return_to_pool()
827                        .times(1)
828                        .in_sequence(&mut seq_cloned.lock().unwrap())
829                        .returning(|| ());
830                    (
831                        (Ok(()), ExecuteTimings::default()),
832                        Box::new(mock_uninstalled),
833                    )
834                });
835        }
836
837        if let Some(f) = f {
838            f(&mut mock);
839        }
840
841        Box::new(mock)
842    }
843
844    fn setup_mocked_scheduler(
845        bank: Arc<Bank>,
846        is_dropped_flags: impl Iterator<Item = bool>,
847    ) -> InstalledSchedulerBox {
848        setup_mocked_scheduler_with_extra(
849            bank,
850            is_dropped_flags,
851            None::<fn(&mut MockInstalledScheduler) -> ()>,
852        )
853    }
854
855    #[test]
856    fn test_scheduler_normal_termination() {
857        solana_logger::setup();
858
859        let bank = Arc::new(Bank::default_for_tests());
860        let bank = BankWithScheduler::new(
861            bank.clone(),
862            Some(setup_mocked_scheduler(bank, [false].into_iter())),
863        );
864        assert!(bank.has_installed_scheduler());
865        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
866
867        // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
868        // returned.
869        assert!(!bank.has_installed_scheduler());
870        assert_matches!(bank.wait_for_completed_scheduler(), None);
871    }
872
873    #[test]
874    fn test_no_scheduler_termination() {
875        solana_logger::setup();
876
877        let bank = Arc::new(Bank::default_for_tests());
878        let bank = BankWithScheduler::new_without_scheduler(bank);
879
880        // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
881        assert!(!bank.has_installed_scheduler());
882        assert_matches!(bank.wait_for_completed_scheduler(), None);
883    }
884
885    #[test]
886    fn test_scheduler_termination_from_drop() {
887        solana_logger::setup();
888
889        let bank = Arc::new(Bank::default_for_tests());
890        let bank = BankWithScheduler::new(
891            bank.clone(),
892            Some(setup_mocked_scheduler(bank, [true].into_iter())),
893        );
894        drop(bank);
895    }
896
897    #[test]
898    fn test_scheduler_pause() {
899        solana_logger::setup();
900
901        let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
902        let bank = BankWithScheduler::new(
903            bank.clone(),
904            Some(setup_mocked_scheduler_with_extra(
905                bank,
906                [false].into_iter(),
907                Some(|mocked: &mut MockInstalledScheduler| {
908                    mocked
909                        .expect_pause_for_recent_blockhash()
910                        .times(1)
911                        .returning(|| ());
912                }),
913            )),
914        );
915        goto_end_of_slot_with_scheduler(&bank);
916        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
917    }
918
919    fn do_test_schedule_execution(should_succeed: bool) {
920        solana_logger::setup();
921
922        let GenesisConfigInfo {
923            genesis_config,
924            mint_keypair,
925            ..
926        } = create_genesis_config(10_000);
927        let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
928            &mint_keypair,
929            &solana_pubkey::new_rand(),
930            2,
931            genesis_config.hash(),
932        ));
933        let bank = Arc::new(Bank::new_for_tests(&genesis_config));
934        let mocked_scheduler = setup_mocked_scheduler_with_extra(
935            bank.clone(),
936            [true].into_iter(),
937            Some(|mocked: &mut MockInstalledScheduler| {
938                if should_succeed {
939                    mocked
940                        .expect_schedule_execution()
941                        .times(1)
942                        .returning(|_, _| Ok(()));
943                } else {
944                    mocked
945                        .expect_schedule_execution()
946                        .times(1)
947                        .returning(|_, _| Err(SchedulerAborted));
948                    mocked
949                        .expect_recover_error_after_abort()
950                        .times(1)
951                        .returning(|| TransactionError::InsufficientFundsForFee);
952                }
953            }),
954        );
955
956        let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
957        let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
958        if should_succeed {
959            assert_matches!(result, Ok(()));
960        } else {
961            assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
962        }
963    }
964
965    #[test]
966    fn test_schedule_execution_success() {
967        do_test_schedule_execution(true);
968    }
969
970    #[test]
971    fn test_schedule_execution_failure() {
972        do_test_schedule_execution(false);
973    }
974}