atlas_runtime/
installed_scheduler_pool.rs

1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`atlas-unified-scheduler-pool`, which
18//! in turn depends on `atlas-ledger`, which in turn depends on `atlas-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24    crate::bank::Bank,
25    assert_matches::assert_matches,
26    log::*,
27    solana_clock::Slot,
28    solana_hash::Hash,
29    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
30    solana_svm_timings::ExecuteTimings,
31    solana_transaction::sanitized::SanitizedTransaction,
32    solana_transaction_error::{TransactionError, TransactionResult as Result},
33    solana_unified_scheduler_logic::SchedulingMode,
34    std::{
35        fmt::{self, Debug},
36        mem,
37        ops::Deref,
38        sync::{Arc, RwLock},
39        thread,
40    },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46    (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50    /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51    /// for a brand-new bank.
52    fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53        self.take_resumed_scheduler(context, initialized_result_with_timings())
54    }
55
56    fn take_resumed_scheduler(
57        &self,
58        context: SchedulingContext,
59        result_with_timings: ResultWithTimings,
60    ) -> InstalledSchedulerBox;
61
62    /// Registers an opaque timeout listener.
63    ///
64    /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65    /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66    /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67    /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68    /// relying on eventual stale listener clean-up by `solScCleaner`.
69    fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70
71    fn uninstalled_from_bank_forks(self: Arc<Self>);
72}
73
74#[derive(Debug)]
75pub struct SchedulerAborted;
76pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
77
78pub struct TimeoutListener {
79    callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
80}
81
82impl TimeoutListener {
83    pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
84        Self {
85            callback: Box::new(f),
86        }
87    }
88
89    pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
90        (self.callback)(pool);
91    }
92}
93
94impl Debug for TimeoutListener {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        write!(f, "TimeoutListener({self:p})")
97    }
98}
99
100#[cfg_attr(doc, aquamarine::aquamarine)]
101/// Schedules, executes, and commits transactions under encapsulated implementation
102///
103/// The following chart illustrates the ownership/reference interaction between inter-dependent
104/// objects across crates:
105///
106/// ```mermaid
107/// graph TD
108///     Bank["Arc#lt;Bank#gt;"]
109///
110///     subgraph atlas-runtime[<span style="font-size: 70%">atlas-runtime</span>]
111///         BankForks;
112///         BankWithScheduler;
113///         Bank;
114///         LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
115///         SchedulingContext;
116///         InstalledSchedulerPool{{InstalledSchedulerPool}};
117///         InstalledScheduler{{InstalledScheduler}};
118///     end
119///
120///     subgraph atlas-unified-scheduler-pool[<span style="font-size: 70%">atlas-unified-scheduler-pool</span>]
121///         SchedulerPool;
122///         PooledScheduler;
123///         ScheduleExecution(["schedule_execution()"]);
124///     end
125///
126///     subgraph atlas-ledger[<span style="font-size: 60%">atlas-ledger</span>]
127///         ExecuteBatch(["execute_batch()"]);
128///     end
129///
130///     ScheduleExecution -. calls .-> ExecuteBatch;
131///     BankWithScheduler -. dyn-calls .-> ScheduleExecution;
132///     ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
133///     linkStyle 0,1,2 stroke:gray,color:gray;
134///
135///     BankForks -- owns --> BankWithScheduler;
136///     BankForks -- owns --> InstalledSchedulerPool;
137///     BankWithScheduler -- refs --> Bank;
138///     BankWithScheduler -- owns --> InstalledScheduler;
139///     SchedulingContext -- refs --> Bank;
140///     InstalledScheduler -- owns --> SchedulingContext;
141///
142///     SchedulerPool -- owns --> PooledScheduler;
143///     SchedulerPool -. impls .-> InstalledSchedulerPool;
144///     PooledScheduler -. impls .-> InstalledScheduler;
145///     PooledScheduler -- refs --> SchedulerPool;
146/// ```
147#[cfg_attr(feature = "dev-context-only-utils", automock)]
148// suppress false clippy complaints arising from mockall-derive:
149//   warning: `#[must_use]` has no effect when applied to a struct field
150#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
151pub trait InstalledScheduler: Send + Sync + Debug + 'static {
152    fn id(&self) -> SchedulerId;
153    fn context(&self) -> &SchedulingContext;
154
155    /// Schedule transaction for execution.
156    ///
157    /// This non-blocking function will return immediately without waiting for actual execution.
158    ///
159    /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
160    /// fatal logic error.
161    ///
162    /// Note that the returned result indicates whether the scheduler has been aborted due to a
163    /// previously-scheduled bad transaction, which terminates further block verification. So,
164    /// almost always, the returned error isn't due to the merely scheduling of the current
165    /// transaction itself. At this point, calling this does nothing anymore while it's still safe
166    /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
167    /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
168    /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
169    /// not-aborted schedulers.
170    ///
171    /// Caller can acquire the error by calling a separate function called
172    /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
173    /// separation and the convoluted returned value semantics explained above are intentional to
174    /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
175    /// cost of far slower error code-path while giving implementors increased flexibility by
176    /// having &mut.
177    fn schedule_execution(
178        &self,
179        transaction: RuntimeTransaction<SanitizedTransaction>,
180        index: usize,
181    ) -> ScheduleResult;
182
183    /// Return the error which caused the scheduler to abort.
184    ///
185    /// Note that this must not be called until it's observed that `schedule_execution()` has
186    /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
187    ///
188    /// That said, calling this multiple times is completely acceptable after the error observation
189    /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
190    /// the first bad transaction are usually returned across invocations.
191    fn recover_error_after_abort(&mut self) -> TransactionError;
192
193    /// Wait for a scheduler to terminate after processing.
194    ///
195    /// This function blocks the current thread while waiting for the scheduler to complete all of
196    /// the executions for the scheduled transactions and to return the finalized
197    /// `ResultWithTimings`. This function still blocks for short period of time even in the case
198    /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
199    ///
200    /// Along with the result being returned, this function also makes the scheduler itself
201    /// uninstalled from the bank by transforming the consumed self.
202    ///
203    /// If no transaction is scheduled, the result and timing will be `Ok(())` and
204    /// `ExecuteTimings::default()` respectively.
205    fn wait_for_termination(
206        self: Box<Self>,
207        is_dropped: bool,
208    ) -> (ResultWithTimings, UninstalledSchedulerBox);
209
210    /// Pause a scheduler after processing to update bank's recent blockhash.
211    ///
212    /// This function blocks the current thread like wait_for_termination(). However, the scheduler
213    /// won't be consumed. This means the scheduler is responsible to retain the finalized
214    /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
215    /// later.
216    fn pause_for_recent_blockhash(&mut self);
217
218    /// Unpause a block production scheduler, immediately after it's taken from the scheduler pool.
219    ///
220    /// This is rather a special-purposed method. Such a scheduler is initially paused due to a
221    /// race condition between the poh thread and handler threads. So, it needs to be unpaused in
222    /// order to start processing transactions by calling this.
223    ///
224    /// # Panics
225    ///
226    /// Panics if called on a block verification scheduler.
227    fn unpause_after_taken(&self);
228}
229
230#[cfg_attr(feature = "dev-context-only-utils", automock)]
231pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
232    fn return_to_pool(self: Box<Self>);
233}
234
235pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
236pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
237
238pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
239
240pub type SchedulerId = u64;
241
242/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
243///
244/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
245/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
246/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
247/// execute all transactions for a given bank for block verification or production. A context is
248/// expected to be used by a particular scheduler only for that duration of the time and to be
249/// disposed by the scheduler. Then, the scheduler may work on different banks with new
250/// `SchedulingContext`s.
251///
252/// There's a special construction only used for scheduler preallocation, which has no bank. Panics
253/// will be triggered when tried to be used normally across code-base.
254#[derive(Clone, Debug)]
255pub struct SchedulingContext {
256    mode: SchedulingMode,
257    bank: Option<Arc<Bank>>,
258}
259
260impl SchedulingContext {
261    pub fn for_preallocation() -> Self {
262        Self {
263            mode: SchedulingMode::BlockProduction,
264            bank: None,
265        }
266    }
267
268    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
269    pub(crate) fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
270        Self {
271            mode,
272            bank: Some(bank),
273        }
274    }
275
276    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
277    fn for_verification(bank: Arc<Bank>) -> Self {
278        Self::new_with_mode(SchedulingMode::BlockVerification, bank)
279    }
280
281    #[cfg(feature = "dev-context-only-utils")]
282    pub fn for_production(bank: Arc<Bank>) -> Self {
283        Self::new_with_mode(SchedulingMode::BlockProduction, bank)
284    }
285
286    pub fn is_preallocated(&self) -> bool {
287        self.bank.is_none()
288    }
289
290    pub fn mode(&self) -> SchedulingMode {
291        self.mode
292    }
293
294    pub fn bank(&self) -> Option<&Arc<Bank>> {
295        self.bank.as_ref()
296    }
297
298    pub fn slot(&self) -> Option<Slot> {
299        self.bank.as_ref().map(|bank| bank.slot())
300    }
301}
302
303pub type ResultWithTimings = (Result<()>, ExecuteTimings);
304
305/// A hint from the bank about the reason the caller is waiting on its scheduler.
306#[derive(Debug, PartialEq, Eq, Clone, Copy)]
307enum WaitReason {
308    // The bank wants its scheduler to terminate after the completion of transaction execution, in
309    // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
310    //
311    // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
312    // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
313    // infallible.
314    TerminatedToFreeze,
315    // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
316    // Drop::drop() is the caller.
317    DroppedFromBankForks,
318    // The bank wants its scheduler to pause after the completion without being returned to the
319    // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
320    // `ResultWithTimings` later.
321    PausedForRecentBlockhash,
322}
323
324impl WaitReason {
325    pub fn is_paused(&self) -> bool {
326        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
327        // decision to be made, should we add new variants like `PausedForFooBar`...
328        match self {
329            WaitReason::PausedForRecentBlockhash => true,
330            WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
331        }
332    }
333
334    pub fn is_dropped(&self) -> bool {
335        // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
336        // decision to be made, should we add new variants like `PausedForFooBar`...
337        match self {
338            WaitReason::DroppedFromBankForks => true,
339            WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
340        }
341    }
342}
343
344#[allow(clippy::large_enum_variant)]
345#[derive(Debug)]
346pub enum SchedulerStatus {
347    /// Unified scheduler is disabled or installed scheduler is consumed by
348    /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
349    /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
350    /// <=> [`Self::Stale`] below.  Also, this variant is transiently used as a placeholder
351    /// internally when transitioning scheduler statuses, which isn't observable unless panic is
352    /// happening.
353    Unavailable,
354    /// Scheduler is installed into a bank; could be running or just be waiting for additional
355    /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
356    /// `atlas_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
357    /// been frozen since installed.
358    Active(InstalledSchedulerBox),
359    /// Scheduler has yet to freeze its associated bank even after it's taken too long since
360    /// installed, resulting in returning the scheduler back to the pool. Later, this can
361    /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
362    /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
363    /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
364    /// the transition happen).
365    Stale(InstalledSchedulerPoolArc, ResultWithTimings),
366}
367
368impl SchedulerStatus {
369    fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
370        match scheduler {
371            Some(scheduler) => SchedulerStatus::Active(scheduler),
372            None => SchedulerStatus::Unavailable,
373        }
374    }
375
376    fn transition_from_stale_to_active(
377        &mut self,
378        f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
379    ) {
380        let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
381            panic!("transition to Active failed: {self:?}");
382        };
383        *self = Self::Active(f(pool, result_with_timings));
384    }
385
386    fn maybe_transition_from_active_to_stale(
387        &mut self,
388        f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
389    ) {
390        if !matches!(self, Self::Active(_scheduler)) {
391            return;
392        }
393        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
394            unreachable!("not active: {self:?}");
395        };
396        let (pool, result_with_timings) = f(scheduler);
397        *self = Self::Stale(pool, result_with_timings);
398    }
399
400    fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
401        let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
402            panic!("transition to Unavailable failed: {self:?}");
403        };
404        scheduler
405    }
406
407    fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
408        let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
409            panic!("transition to Unavailable failed: {self:?}");
410        };
411        result_with_timings
412    }
413
414    fn active_scheduler(&self) -> &InstalledSchedulerBox {
415        let SchedulerStatus::Active(active_scheduler) = self else {
416            panic!("not active: {self:?}");
417        };
418        active_scheduler
419    }
420}
421
422/// Very thin wrapper around Arc<Bank>
423///
424/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
425/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
426/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
427/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
428///
429/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
430/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
431/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
432/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
433/// used for `BankWithScheduler` across codebase.
434///
435/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
436/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
437/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
438/// unusual outside scheduler code-path)
439#[derive(Debug)]
440pub struct BankWithScheduler {
441    inner: Arc<BankWithSchedulerInner>,
442}
443
444#[derive(Debug)]
445pub struct BankWithSchedulerInner {
446    bank: Arc<Bank>,
447    scheduler: InstalledSchedulerRwLock,
448}
449pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
450
451impl BankWithScheduler {
452    /// Creates a new `BankWithScheduler` from bank and its associated scheduler.
453    ///
454    /// # Panics
455    ///
456    /// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler
457    /// preallocation.
458    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
459    pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
460        // Avoid the fatal situation in which bank is being associated with a scheduler associated
461        // to a different bank!
462        if let Some(bank_in_context) = scheduler
463            .as_ref()
464            .map(|scheduler| scheduler.context().bank().unwrap())
465        {
466            assert!(Arc::ptr_eq(&bank, bank_in_context));
467        }
468
469        Self {
470            inner: Arc::new(BankWithSchedulerInner {
471                bank,
472                scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
473            }),
474        }
475    }
476
477    pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
478        Self::new(bank, None)
479    }
480
481    pub fn clone_with_scheduler(&self) -> BankWithScheduler {
482        BankWithScheduler {
483            inner: self.inner.clone(),
484        }
485    }
486
487    pub fn clone_without_scheduler(&self) -> Arc<Bank> {
488        self.inner.bank.clone()
489    }
490
491    pub fn register_tick(&self, hash: &Hash) {
492        self.inner.bank.register_tick(hash, &self.inner.scheduler);
493    }
494
495    #[cfg(feature = "dev-context-only-utils")]
496    pub fn fill_bank_with_ticks_for_tests(&self) {
497        self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
498    }
499
500    pub fn has_installed_scheduler(&self) -> bool {
501        !matches!(
502            &*self.inner.scheduler.read().unwrap(),
503            SchedulerStatus::Unavailable
504        )
505    }
506
507    /// Schedule the transaction as long as the scheduler hasn't been aborted.
508    ///
509    /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
510    /// return the error of prior scheduled transaction.
511    ///
512    /// Calling this will panic if the installed scheduler is Unavailable (the bank is
513    /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
514    pub fn schedule_transaction_executions(
515        &self,
516        transactions_with_indexes: impl ExactSizeIterator<
517            Item = (RuntimeTransaction<SanitizedTransaction>, usize),
518        >,
519    ) -> Result<()> {
520        trace!(
521            "schedule_transaction_executions(): {} txs",
522            transactions_with_indexes.len()
523        );
524
525        let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
526            for (sanitized_transaction, index) in transactions_with_indexes {
527                scheduler.schedule_execution(sanitized_transaction, index)?;
528            }
529            Ok(())
530        });
531
532        if schedule_result.is_err() {
533            // This write lock isn't atomic with the above the read lock. So, another thread
534            // could have called .recover_error_after_abort() while we're literally stuck at
535            // the gaps of these locks (i.e. this comment in source code wise) under extreme
536            // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
537            // consideration in mind.
538            //
539            // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
540            return Err(self.inner.retrieve_error_after_schedule_failure());
541        }
542
543        Ok(())
544    }
545
546    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
547    pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
548        self.inner.do_create_timeout_listener()
549    }
550
551    // take needless &mut only to communicate its semantic mutability to humans...
552    #[cfg(feature = "dev-context-only-utils")]
553    pub fn drop_scheduler(&mut self) {
554        self.inner.drop_scheduler();
555    }
556
557    pub fn unpause_new_block_production_scheduler(&self) {
558        if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
559            assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
560            scheduler.unpause_after_taken();
561        }
562    }
563
564    pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
565        let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
566            bank,
567            scheduler,
568            WaitReason::PausedForRecentBlockhash,
569        );
570        assert!(
571            maybe_result_with_timings.is_none(),
572            "Premature result was returned from scheduler after paused (slot: {})",
573            bank.slot(),
574        );
575    }
576
577    #[must_use]
578    pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
579        BankWithSchedulerInner::wait_for_scheduler_termination(
580            &self.inner.bank,
581            &self.inner.scheduler,
582            WaitReason::TerminatedToFreeze,
583        )
584    }
585
586    pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
587        RwLock::new(SchedulerStatus::Unavailable)
588    }
589}
590
591impl BankWithSchedulerInner {
592    fn with_active_scheduler(
593        self: &Arc<Self>,
594        f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
595    ) -> ScheduleResult {
596        let scheduler = self.scheduler.read().unwrap();
597        match &*scheduler {
598            SchedulerStatus::Active(scheduler) => {
599                // This is the fast path, needing single read-lock most of time.
600                f(scheduler)
601            }
602            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
603                trace!(
604                    "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
605                    self.bank.slot(),
606                );
607                Err(SchedulerAborted)
608            }
609            SchedulerStatus::Stale(pool, _result_with_timings) => {
610                let pool = pool.clone();
611                drop(scheduler);
612
613                // Schedulers can be stale only if its mode is block-verification. So,
614                // unconditional context construction for verification is okay here.
615                let context = SchedulingContext::for_verification(self.bank.clone());
616                let mut scheduler = self.scheduler.write().unwrap();
617                trace!("with_active_scheduler: {scheduler:?}");
618                scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
619                    let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
620                    info!(
621                        "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: \
622                         {})",
623                        self.bank.slot(),
624                        scheduler.id(),
625                    );
626                    scheduler
627                });
628                drop(scheduler);
629
630                let scheduler = self.scheduler.read().unwrap();
631                // Re-register a new timeout listener only after acquiring the read lock;
632                // Otherwise, the listener would again put scheduler into Stale before the read
633                // lock under an extremely-rare race condition, causing panic below in
634                // active_scheduler().
635                pool.register_timeout_listener(self.do_create_timeout_listener());
636                f(scheduler.active_scheduler())
637            }
638            SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
639        }
640    }
641
642    fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
643        let weak_bank = Arc::downgrade(self);
644        TimeoutListener::new(move |pool| {
645            let Some(bank) = weak_bank.upgrade() else {
646                // BankWithSchedulerInner is already dropped, indicating successful and timely
647                // `wait_for_termination()` on the bank prior to this triggering of the timeout,
648                // rendering this callback invocation no-op.
649                return;
650            };
651
652            let Ok(mut scheduler) = bank.scheduler.write() else {
653                // BankWithScheduler's lock is poisoned...
654                return;
655            };
656
657            // Reaching here means that it's been awhile since this active scheduler is taken from
658            // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
659            // thread creation under forky condition, return the scheduler for now, even if the
660            // bank could process more transactions later.
661            scheduler.maybe_transition_from_active_to_stale(|scheduler| {
662                // Return the installed scheduler back to the scheduler pool as soon as the
663                // scheduler indicates the completion of all currently-scheduled transaction
664                // executions by `atlas_unified_scheduler_pool::ThreadManager::end_session()`
665                // internally.
666
667                let id = scheduler.id();
668                let (result_with_timings, uninstalled_scheduler) =
669                    scheduler.wait_for_termination(false);
670                uninstalled_scheduler.return_to_pool();
671                info!(
672                    "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
673                    bank.bank.slot(),
674                    id,
675                );
676                (pool, result_with_timings)
677            });
678            trace!("timeout_listener: {scheduler:?}");
679        })
680    }
681
682    /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
683    /// `panic!()`.
684    fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
685        let mut scheduler = self.scheduler.write().unwrap();
686        match &mut *scheduler {
687            SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
688            SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
689                result.clone().unwrap_err()
690            }
691            _ => unreachable!("no error in {:?}", self.scheduler),
692        }
693    }
694
695    #[must_use]
696    fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
697        Self::wait_for_scheduler_termination(
698            &self.bank,
699            &self.scheduler,
700            WaitReason::DroppedFromBankForks,
701        )
702    }
703
704    #[must_use]
705    fn wait_for_scheduler_termination(
706        bank: &Bank,
707        scheduler: &InstalledSchedulerRwLock,
708        reason: WaitReason,
709    ) -> Option<ResultWithTimings> {
710        debug!(
711            "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
712            bank.slot(),
713            reason,
714            thread::current(),
715        );
716
717        let mut scheduler = scheduler.write().unwrap();
718        let (was_noop, result_with_timings) = match &mut *scheduler {
719            SchedulerStatus::Active(scheduler) if reason.is_paused() => {
720                scheduler.pause_for_recent_blockhash();
721                (false, None)
722            }
723            SchedulerStatus::Active(_scheduler) => {
724                let scheduler = scheduler.transition_from_active_to_unavailable();
725                let (result_with_timings, uninstalled_scheduler) =
726                    scheduler.wait_for_termination(reason.is_dropped());
727                uninstalled_scheduler.return_to_pool();
728                (false, Some(result_with_timings))
729            }
730            SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
731                // Do nothing for pauses because the scheduler termination is guaranteed to be
732                // called later.
733                (true, None)
734            }
735            SchedulerStatus::Stale(_pool, _result_with_timings) => {
736                let result_with_timings = scheduler.transition_from_stale_to_unavailable();
737                (true, Some(result_with_timings))
738            }
739            SchedulerStatus::Unavailable => (true, None),
740        };
741        debug!(
742            "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at \
743             {:?}...",
744            bank.slot(),
745            reason,
746            was_noop,
747            result_with_timings.as_ref().map(|(result, _)| result),
748            thread::current(),
749        );
750        trace!("wait_for_scheduler_termination(result_with_timings: {result_with_timings:?})",);
751
752        result_with_timings
753    }
754
755    fn drop_scheduler(&self) {
756        if thread::panicking() {
757            error!(
758                "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already \
759                 panicking...",
760                self.bank.slot(),
761            );
762            return;
763        }
764
765        // There's no guarantee ResultWithTimings is available or not at all when being dropped.
766        if let Some(Err(err)) = self
767            .wait_for_completed_scheduler_from_drop()
768            .map(|(result, _timings)| result)
769        {
770            warn!(
771                "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from \
772                 scheduler: {:?}",
773                self.bank.slot(),
774                err,
775            );
776        }
777    }
778}
779
780impl Drop for BankWithSchedulerInner {
781    fn drop(&mut self) {
782        self.drop_scheduler();
783    }
784}
785
786impl Deref for BankWithScheduler {
787    type Target = Arc<Bank>;
788
789    fn deref(&self) -> &Self::Target {
790        &self.inner.bank
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use {
797        super::*,
798        crate::{
799            bank::test_utils::goto_end_of_slot_with_scheduler,
800            genesis_utils::{create_genesis_config, GenesisConfigInfo},
801        },
802        mockall::Sequence,
803        solana_system_transaction as system_transaction,
804        std::sync::Mutex,
805    };
806
807    fn setup_mocked_scheduler_with_extra(
808        bank: Arc<Bank>,
809        is_dropped_flags: impl Iterator<Item = bool>,
810        f: Option<impl Fn(&mut MockInstalledScheduler)>,
811    ) -> InstalledSchedulerBox {
812        let mut mock = MockInstalledScheduler::new();
813        let seq = Arc::new(Mutex::new(Sequence::new()));
814
815        mock.expect_context()
816            .times(1)
817            .in_sequence(&mut seq.lock().unwrap())
818            .return_const(SchedulingContext::for_verification(bank));
819
820        for wait_reason in is_dropped_flags {
821            let seq_cloned = seq.clone();
822            mock.expect_wait_for_termination()
823                .with(mockall::predicate::eq(wait_reason))
824                .times(1)
825                .in_sequence(&mut seq.lock().unwrap())
826                .returning(move |_| {
827                    let mut mock_uninstalled = MockUninstalledScheduler::new();
828                    mock_uninstalled
829                        .expect_return_to_pool()
830                        .times(1)
831                        .in_sequence(&mut seq_cloned.lock().unwrap())
832                        .returning(|| ());
833                    (
834                        (Ok(()), ExecuteTimings::default()),
835                        Box::new(mock_uninstalled),
836                    )
837                });
838        }
839
840        if let Some(f) = f {
841            f(&mut mock);
842        }
843
844        Box::new(mock)
845    }
846
847    fn setup_mocked_scheduler(
848        bank: Arc<Bank>,
849        is_dropped_flags: impl Iterator<Item = bool>,
850    ) -> InstalledSchedulerBox {
851        setup_mocked_scheduler_with_extra(
852            bank,
853            is_dropped_flags,
854            None::<fn(&mut MockInstalledScheduler) -> ()>,
855        )
856    }
857
858    #[test]
859    fn test_scheduler_normal_termination() {
860        solana_logger::setup();
861
862        let bank = Arc::new(Bank::default_for_tests());
863        let bank = BankWithScheduler::new(
864            bank.clone(),
865            Some(setup_mocked_scheduler(bank, [false].into_iter())),
866        );
867        assert!(bank.has_installed_scheduler());
868        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
869
870        // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
871        // returned.
872        assert!(!bank.has_installed_scheduler());
873        assert_matches!(bank.wait_for_completed_scheduler(), None);
874    }
875
876    #[test]
877    fn test_no_scheduler_termination() {
878        solana_logger::setup();
879
880        let bank = Arc::new(Bank::default_for_tests());
881        let bank = BankWithScheduler::new_without_scheduler(bank);
882
883        // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
884        assert!(!bank.has_installed_scheduler());
885        assert_matches!(bank.wait_for_completed_scheduler(), None);
886    }
887
888    #[test]
889    fn test_scheduler_termination_from_drop() {
890        solana_logger::setup();
891
892        let bank = Arc::new(Bank::default_for_tests());
893        let bank = BankWithScheduler::new(
894            bank.clone(),
895            Some(setup_mocked_scheduler(bank, [true].into_iter())),
896        );
897        drop(bank);
898    }
899
900    #[test]
901    fn test_scheduler_pause() {
902        solana_logger::setup();
903
904        let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
905        let bank = BankWithScheduler::new(
906            bank.clone(),
907            Some(setup_mocked_scheduler_with_extra(
908                bank,
909                [false].into_iter(),
910                Some(|mocked: &mut MockInstalledScheduler| {
911                    mocked
912                        .expect_pause_for_recent_blockhash()
913                        .times(1)
914                        .returning(|| ());
915                }),
916            )),
917        );
918        goto_end_of_slot_with_scheduler(&bank);
919        assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
920    }
921
922    fn do_test_schedule_execution(should_succeed: bool) {
923        solana_logger::setup();
924
925        let GenesisConfigInfo {
926            genesis_config,
927            mint_keypair,
928            ..
929        } = create_genesis_config(10_000);
930        let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
931            &mint_keypair,
932            &solana_pubkey::new_rand(),
933            2,
934            genesis_config.hash(),
935        ));
936        let bank = Arc::new(Bank::new_for_tests(&genesis_config));
937        let mocked_scheduler = setup_mocked_scheduler_with_extra(
938            bank.clone(),
939            [true].into_iter(),
940            Some(|mocked: &mut MockInstalledScheduler| {
941                if should_succeed {
942                    mocked
943                        .expect_schedule_execution()
944                        .times(1)
945                        .returning(|_, _| Ok(()));
946                } else {
947                    mocked
948                        .expect_schedule_execution()
949                        .times(1)
950                        .returning(|_, _| Err(SchedulerAborted));
951                    mocked
952                        .expect_recover_error_after_abort()
953                        .times(1)
954                        .returning(|| TransactionError::InsufficientFundsForFee);
955                }
956            }),
957        );
958
959        let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
960        let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
961        if should_succeed {
962            assert_matches!(result, Ok(()));
963        } else {
964            assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
965        }
966    }
967
968    #[test]
969    fn test_schedule_execution_success() {
970        do_test_schedule_execution(true);
971    }
972
973    #[test]
974    fn test_schedule_execution_failure() {
975        do_test_schedule_execution(false);
976    }
977}