solana_runtime/installed_scheduler_pool.rs
1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
18//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24 crate::bank::Bank,
25 assert_matches::assert_matches,
26 log::*,
27 solana_clock::Slot,
28 solana_hash::Hash,
29 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
30 solana_timings::ExecuteTimings,
31 solana_transaction::sanitized::SanitizedTransaction,
32 solana_transaction_error::{TransactionError, TransactionResult as Result},
33 solana_unified_scheduler_logic::SchedulingMode,
34 std::{
35 fmt::{self, Debug},
36 mem,
37 ops::Deref,
38 sync::{Arc, RwLock},
39 thread,
40 },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46 (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50 /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51 /// for a brand-new bank.
52 fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53 self.take_resumed_scheduler(context, initialized_result_with_timings())
54 }
55
56 fn take_resumed_scheduler(
57 &self,
58 context: SchedulingContext,
59 result_with_timings: ResultWithTimings,
60 ) -> InstalledSchedulerBox;
61
62 /// Registers an opaque timeout listener.
63 ///
64 /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65 /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66 /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67 /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68 /// relying on eventual stale listener clean-up by `solScCleaner`.
69 fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70}
71
72#[derive(Debug)]
73pub struct SchedulerAborted;
74pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
75
76pub struct TimeoutListener {
77 callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
78}
79
80impl TimeoutListener {
81 pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
82 Self {
83 callback: Box::new(f),
84 }
85 }
86
87 pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
88 (self.callback)(pool);
89 }
90}
91
92impl Debug for TimeoutListener {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 write!(f, "TimeoutListener({self:p})")
95 }
96}
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99/// Schedules, executes, and commits transactions under encapsulated implementation
100///
101/// The following chart illustrates the ownership/reference interaction between inter-dependent
102/// objects across crates:
103///
104/// ```mermaid
105/// graph TD
106/// Bank["Arc#lt;Bank#gt;"]
107///
108/// subgraph solana-runtime[<span style="font-size: 70%">solana-runtime</span>]
109/// BankForks;
110/// BankWithScheduler;
111/// Bank;
112/// LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
113/// SchedulingContext;
114/// InstalledSchedulerPool{{InstalledSchedulerPool}};
115/// InstalledScheduler{{InstalledScheduler}};
116/// end
117///
118/// subgraph solana-unified-scheduler-pool[<span style="font-size: 70%">solana-unified-scheduler-pool</span>]
119/// SchedulerPool;
120/// PooledScheduler;
121/// ScheduleExecution(["schedule_execution()"]);
122/// end
123///
124/// subgraph solana-ledger[<span style="font-size: 60%">solana-ledger</span>]
125/// ExecuteBatch(["execute_batch()"]);
126/// end
127///
128/// ScheduleExecution -. calls .-> ExecuteBatch;
129/// BankWithScheduler -. dyn-calls .-> ScheduleExecution;
130/// ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
131/// linkStyle 0,1,2 stroke:gray,color:gray;
132///
133/// BankForks -- owns --> BankWithScheduler;
134/// BankForks -- owns --> InstalledSchedulerPool;
135/// BankWithScheduler -- refs --> Bank;
136/// BankWithScheduler -- owns --> InstalledScheduler;
137/// SchedulingContext -- refs --> Bank;
138/// InstalledScheduler -- owns --> SchedulingContext;
139///
140/// SchedulerPool -- owns --> PooledScheduler;
141/// SchedulerPool -. impls .-> InstalledSchedulerPool;
142/// PooledScheduler -. impls .-> InstalledScheduler;
143/// PooledScheduler -- refs --> SchedulerPool;
144/// ```
145#[cfg_attr(feature = "dev-context-only-utils", automock)]
146// suppress false clippy complaints arising from mockall-derive:
147// warning: `#[must_use]` has no effect when applied to a struct field
148#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
149pub trait InstalledScheduler: Send + Sync + Debug + 'static {
150 fn id(&self) -> SchedulerId;
151 fn context(&self) -> &SchedulingContext;
152
153 /// Schedule transaction for execution.
154 ///
155 /// This non-blocking function will return immediately without waiting for actual execution.
156 ///
157 /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
158 /// fatal logic error.
159 ///
160 /// Note that the returned result indicates whether the scheduler has been aborted due to a
161 /// previously-scheduled bad transaction, which terminates further block verification. So,
162 /// almost always, the returned error isn't due to the merely scheduling of the current
163 /// transaction itself. At this point, calling this does nothing anymore while it's still safe
164 /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
165 /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
166 /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
167 /// not-aborted schedulers.
168 ///
169 /// Caller can acquire the error by calling a separate function called
170 /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
171 /// separation and the convoluted returned value semantics explained above are intentional to
172 /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
173 /// cost of far slower error code-path while giving implementors increased flexibility by
174 /// having &mut.
175 fn schedule_execution(
176 &self,
177 transaction: RuntimeTransaction<SanitizedTransaction>,
178 index: usize,
179 ) -> ScheduleResult;
180
181 /// Return the error which caused the scheduler to abort.
182 ///
183 /// Note that this must not be called until it's observed that `schedule_execution()` has
184 /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
185 ///
186 /// That said, calling this multiple times is completely acceptable after the error observation
187 /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
188 /// the first bad transaction are usually returned across invocations.
189 fn recover_error_after_abort(&mut self) -> TransactionError;
190
191 /// Wait for a scheduler to terminate after processing.
192 ///
193 /// This function blocks the current thread while waiting for the scheduler to complete all of
194 /// the executions for the scheduled transactions and to return the finalized
195 /// `ResultWithTimings`. This function still blocks for short period of time even in the case
196 /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
197 ///
198 /// Along with the result being returned, this function also makes the scheduler itself
199 /// uninstalled from the bank by transforming the consumed self.
200 ///
201 /// If no transaction is scheduled, the result and timing will be `Ok(())` and
202 /// `ExecuteTimings::default()` respectively.
203 fn wait_for_termination(
204 self: Box<Self>,
205 is_dropped: bool,
206 ) -> (ResultWithTimings, UninstalledSchedulerBox);
207
208 /// Pause a scheduler after processing to update bank's recent blockhash.
209 ///
210 /// This function blocks the current thread like wait_for_termination(). However, the scheduler
211 /// won't be consumed. This means the scheduler is responsible to retain the finalized
212 /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
213 /// later.
214 fn pause_for_recent_blockhash(&mut self);
215
216 /// Unpause a block production scheduler, immediately after it's taken from the scheduler pool.
217 ///
218 /// This is rather a special-purposed method. Such a scheduler is initially paused due to a
219 /// race condition between the poh thread and handler threads. So, it needs to be unpaused in
220 /// order to start processing transactions by calling this.
221 ///
222 /// # Panics
223 ///
224 /// Panics if called on a block verification scheduler.
225 fn unpause_after_taken(&self);
226}
227
228#[cfg_attr(feature = "dev-context-only-utils", automock)]
229pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
230 fn return_to_pool(self: Box<Self>);
231}
232
233pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
234pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
235
236pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
237
238pub type SchedulerId = u64;
239
240/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
241///
242/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
243/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
244/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
245/// execute all transactions for a given bank for block verification or production. A context is
246/// expected to be used by a particular scheduler only for that duration of the time and to be
247/// disposed by the scheduler. Then, the scheduler may work on different banks with new
248/// `SchedulingContext`s.
249///
250/// There's a special construction only used for scheduler preallocation, which has no bank. Panics
251/// will be triggered when tried to be used normally across code-base.
252#[derive(Clone, Debug)]
253pub struct SchedulingContext {
254 mode: SchedulingMode,
255 bank: Option<Arc<Bank>>,
256}
257
258impl SchedulingContext {
259 pub fn for_preallocation() -> Self {
260 Self {
261 mode: SchedulingMode::BlockProduction,
262 bank: None,
263 }
264 }
265
266 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
267 pub(crate) fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
268 Self {
269 mode,
270 bank: Some(bank),
271 }
272 }
273
274 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
275 fn for_verification(bank: Arc<Bank>) -> Self {
276 Self::new_with_mode(SchedulingMode::BlockVerification, bank)
277 }
278
279 #[cfg(feature = "dev-context-only-utils")]
280 pub fn for_production(bank: Arc<Bank>) -> Self {
281 Self::new_with_mode(SchedulingMode::BlockProduction, bank)
282 }
283
284 pub fn is_preallocated(&self) -> bool {
285 self.bank.is_none()
286 }
287
288 pub fn mode(&self) -> SchedulingMode {
289 self.mode
290 }
291
292 pub fn bank(&self) -> Option<&Arc<Bank>> {
293 self.bank.as_ref()
294 }
295
296 pub fn slot(&self) -> Option<Slot> {
297 self.bank.as_ref().map(|bank| bank.slot())
298 }
299}
300
301pub type ResultWithTimings = (Result<()>, ExecuteTimings);
302
303/// A hint from the bank about the reason the caller is waiting on its scheduler.
304#[derive(Debug, PartialEq, Eq, Clone, Copy)]
305enum WaitReason {
306 // The bank wants its scheduler to terminate after the completion of transaction execution, in
307 // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
308 //
309 // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
310 // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
311 // infallible.
312 TerminatedToFreeze,
313 // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
314 // Drop::drop() is the caller.
315 DroppedFromBankForks,
316 // The bank wants its scheduler to pause after the completion without being returned to the
317 // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
318 // `ResultWithTimings` later.
319 PausedForRecentBlockhash,
320}
321
322impl WaitReason {
323 pub fn is_paused(&self) -> bool {
324 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
325 // decision to be made, should we add new variants like `PausedForFooBar`...
326 match self {
327 WaitReason::PausedForRecentBlockhash => true,
328 WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
329 }
330 }
331
332 pub fn is_dropped(&self) -> bool {
333 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
334 // decision to be made, should we add new variants like `PausedForFooBar`...
335 match self {
336 WaitReason::DroppedFromBankForks => true,
337 WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
338 }
339 }
340}
341
342#[allow(clippy::large_enum_variant)]
343#[derive(Debug)]
344pub enum SchedulerStatus {
345 /// Unified scheduler is disabled or installed scheduler is consumed by
346 /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
347 /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
348 /// <=> [`Self::Stale`] below. Also, this variant is transiently used as a placeholder
349 /// internally when transitioning scheduler statuses, which isn't observable unless panic is
350 /// happening.
351 Unavailable,
352 /// Scheduler is installed into a bank; could be running or just be waiting for additional
353 /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
354 /// `solana_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
355 /// been frozen since installed.
356 Active(InstalledSchedulerBox),
357 /// Scheduler has yet to freeze its associated bank even after it's taken too long since
358 /// installed, resulting in returning the scheduler back to the pool. Later, this can
359 /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
360 /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
361 /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
362 /// the transition happen).
363 Stale(InstalledSchedulerPoolArc, ResultWithTimings),
364}
365
366impl SchedulerStatus {
367 fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
368 match scheduler {
369 Some(scheduler) => SchedulerStatus::Active(scheduler),
370 None => SchedulerStatus::Unavailable,
371 }
372 }
373
374 fn transition_from_stale_to_active(
375 &mut self,
376 f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
377 ) {
378 let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
379 panic!("transition to Active failed: {self:?}");
380 };
381 *self = Self::Active(f(pool, result_with_timings));
382 }
383
384 fn maybe_transition_from_active_to_stale(
385 &mut self,
386 f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
387 ) {
388 if !matches!(self, Self::Active(_scheduler)) {
389 return;
390 }
391 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
392 unreachable!("not active: {self:?}");
393 };
394 let (pool, result_with_timings) = f(scheduler);
395 *self = Self::Stale(pool, result_with_timings);
396 }
397
398 fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
399 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
400 panic!("transition to Unavailable failed: {self:?}");
401 };
402 scheduler
403 }
404
405 fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
406 let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
407 panic!("transition to Unavailable failed: {self:?}");
408 };
409 result_with_timings
410 }
411
412 fn active_scheduler(&self) -> &InstalledSchedulerBox {
413 let SchedulerStatus::Active(active_scheduler) = self else {
414 panic!("not active: {self:?}");
415 };
416 active_scheduler
417 }
418}
419
420/// Very thin wrapper around Arc<Bank>
421///
422/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
423/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
424/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
425/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
426///
427/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
428/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
429/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
430/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
431/// used for `BankWithScheduler` across codebase.
432///
433/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
434/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
435/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
436/// unusual outside scheduler code-path)
437#[derive(Debug)]
438pub struct BankWithScheduler {
439 inner: Arc<BankWithSchedulerInner>,
440}
441
442#[derive(Debug)]
443pub struct BankWithSchedulerInner {
444 bank: Arc<Bank>,
445 scheduler: InstalledSchedulerRwLock,
446}
447pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
448
449impl BankWithScheduler {
450 /// Creates a new `BankWithScheduler` from bank and its associated scheduler.
451 ///
452 /// # Panics
453 ///
454 /// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler
455 /// preallocation.
456 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
457 pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
458 // Avoid the fatal situation in which bank is being associated with a scheduler associated
459 // to a different bank!
460 if let Some(bank_in_context) = scheduler
461 .as_ref()
462 .map(|scheduler| scheduler.context().bank().unwrap())
463 {
464 assert!(Arc::ptr_eq(&bank, bank_in_context));
465 }
466
467 Self {
468 inner: Arc::new(BankWithSchedulerInner {
469 bank,
470 scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
471 }),
472 }
473 }
474
475 pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
476 Self::new(bank, None)
477 }
478
479 pub fn clone_with_scheduler(&self) -> BankWithScheduler {
480 BankWithScheduler {
481 inner: self.inner.clone(),
482 }
483 }
484
485 pub fn clone_without_scheduler(&self) -> Arc<Bank> {
486 self.inner.bank.clone()
487 }
488
489 pub fn register_tick(&self, hash: &Hash) {
490 self.inner.bank.register_tick(hash, &self.inner.scheduler);
491 }
492
493 #[cfg(feature = "dev-context-only-utils")]
494 pub fn fill_bank_with_ticks_for_tests(&self) {
495 self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
496 }
497
498 pub fn has_installed_scheduler(&self) -> bool {
499 !matches!(
500 &*self.inner.scheduler.read().unwrap(),
501 SchedulerStatus::Unavailable
502 )
503 }
504
505 /// Schedule the transaction as long as the scheduler hasn't been aborted.
506 ///
507 /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
508 /// return the error of prior scheduled transaction.
509 ///
510 /// Calling this will panic if the installed scheduler is Unavailable (the bank is
511 /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
512 pub fn schedule_transaction_executions(
513 &self,
514 transactions_with_indexes: impl ExactSizeIterator<
515 Item = (RuntimeTransaction<SanitizedTransaction>, usize),
516 >,
517 ) -> Result<()> {
518 trace!(
519 "schedule_transaction_executions(): {} txs",
520 transactions_with_indexes.len()
521 );
522
523 let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
524 for (sanitized_transaction, index) in transactions_with_indexes {
525 scheduler.schedule_execution(sanitized_transaction, index)?;
526 }
527 Ok(())
528 });
529
530 if schedule_result.is_err() {
531 // This write lock isn't atomic with the above the read lock. So, another thread
532 // could have called .recover_error_after_abort() while we're literally stuck at
533 // the gaps of these locks (i.e. this comment in source code wise) under extreme
534 // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
535 // consideration in mind.
536 //
537 // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
538 return Err(self.inner.retrieve_error_after_schedule_failure());
539 }
540
541 Ok(())
542 }
543
544 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
545 pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
546 self.inner.do_create_timeout_listener()
547 }
548
549 // take needless &mut only to communicate its semantic mutability to humans...
550 #[cfg(feature = "dev-context-only-utils")]
551 pub fn drop_scheduler(&mut self) {
552 self.inner.drop_scheduler();
553 }
554
555 pub fn unpause_new_block_production_scheduler(&self) {
556 if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
557 assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
558 scheduler.unpause_after_taken();
559 }
560 }
561
562 pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
563 let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
564 bank,
565 scheduler,
566 WaitReason::PausedForRecentBlockhash,
567 );
568 assert!(
569 maybe_result_with_timings.is_none(),
570 "Premature result was returned from scheduler after paused (slot: {})",
571 bank.slot(),
572 );
573 }
574
575 #[must_use]
576 pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
577 BankWithSchedulerInner::wait_for_scheduler_termination(
578 &self.inner.bank,
579 &self.inner.scheduler,
580 WaitReason::TerminatedToFreeze,
581 )
582 }
583
584 pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
585 RwLock::new(SchedulerStatus::Unavailable)
586 }
587}
588
589impl BankWithSchedulerInner {
590 fn with_active_scheduler(
591 self: &Arc<Self>,
592 f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
593 ) -> ScheduleResult {
594 let scheduler = self.scheduler.read().unwrap();
595 match &*scheduler {
596 SchedulerStatus::Active(scheduler) => {
597 // This is the fast path, needing single read-lock most of time.
598 f(scheduler)
599 }
600 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
601 trace!(
602 "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
603 self.bank.slot(),
604 );
605 Err(SchedulerAborted)
606 }
607 SchedulerStatus::Stale(pool, _result_with_timings) => {
608 let pool = pool.clone();
609 drop(scheduler);
610
611 // Schedulers can be stale only if its mode is block-verification. So,
612 // unconditional context construction for verification is okay here.
613 let context = SchedulingContext::for_verification(self.bank.clone());
614 let mut scheduler = self.scheduler.write().unwrap();
615 trace!("with_active_scheduler: {:?}", scheduler);
616 scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
617 let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
618 info!(
619 "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
620 self.bank.slot(),
621 scheduler.id(),
622 );
623 scheduler
624 });
625 drop(scheduler);
626
627 let scheduler = self.scheduler.read().unwrap();
628 // Re-register a new timeout listener only after acquiring the read lock;
629 // Otherwise, the listener would again put scheduler into Stale before the read
630 // lock under an extremely-rare race condition, causing panic below in
631 // active_scheduler().
632 pool.register_timeout_listener(self.do_create_timeout_listener());
633 f(scheduler.active_scheduler())
634 }
635 SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
636 }
637 }
638
639 fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
640 let weak_bank = Arc::downgrade(self);
641 TimeoutListener::new(move |pool| {
642 let Some(bank) = weak_bank.upgrade() else {
643 // BankWithSchedulerInner is already dropped, indicating successful and timely
644 // `wait_for_termination()` on the bank prior to this triggering of the timeout,
645 // rendering this callback invocation no-op.
646 return;
647 };
648
649 let Ok(mut scheduler) = bank.scheduler.write() else {
650 // BankWithScheduler's lock is poisoned...
651 return;
652 };
653
654 // Reaching here means that it's been awhile since this active scheduler is taken from
655 // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
656 // thread creation under forky condition, return the scheduler for now, even if the
657 // bank could process more transactions later.
658 scheduler.maybe_transition_from_active_to_stale(|scheduler| {
659 // Return the installed scheduler back to the scheduler pool as soon as the
660 // scheduler indicates the completion of all currently-scheduled transaction
661 // executions by `solana_unified_scheduler_pool::ThreadManager::end_session()`
662 // internally.
663
664 let id = scheduler.id();
665 let (result_with_timings, uninstalled_scheduler) =
666 scheduler.wait_for_termination(false);
667 uninstalled_scheduler.return_to_pool();
668 info!(
669 "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
670 bank.bank.slot(),
671 id,
672 );
673 (pool, result_with_timings)
674 });
675 trace!("timeout_listener: {:?}", scheduler);
676 })
677 }
678
679 /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
680 /// `panic!()`.
681 fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
682 let mut scheduler = self.scheduler.write().unwrap();
683 match &mut *scheduler {
684 SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
685 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
686 result.clone().unwrap_err()
687 }
688 _ => unreachable!("no error in {:?}", self.scheduler),
689 }
690 }
691
692 #[must_use]
693 fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
694 Self::wait_for_scheduler_termination(
695 &self.bank,
696 &self.scheduler,
697 WaitReason::DroppedFromBankForks,
698 )
699 }
700
701 #[must_use]
702 fn wait_for_scheduler_termination(
703 bank: &Bank,
704 scheduler: &InstalledSchedulerRwLock,
705 reason: WaitReason,
706 ) -> Option<ResultWithTimings> {
707 debug!(
708 "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
709 bank.slot(),
710 reason,
711 thread::current(),
712 );
713
714 let mut scheduler = scheduler.write().unwrap();
715 let (was_noop, result_with_timings) = match &mut *scheduler {
716 SchedulerStatus::Active(scheduler) if reason.is_paused() => {
717 scheduler.pause_for_recent_blockhash();
718 (false, None)
719 }
720 SchedulerStatus::Active(_scheduler) => {
721 let scheduler = scheduler.transition_from_active_to_unavailable();
722 let (result_with_timings, uninstalled_scheduler) =
723 scheduler.wait_for_termination(reason.is_dropped());
724 uninstalled_scheduler.return_to_pool();
725 (false, Some(result_with_timings))
726 }
727 SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
728 // Do nothing for pauses because the scheduler termination is guaranteed to be
729 // called later.
730 (true, None)
731 }
732 SchedulerStatus::Stale(_pool, _result_with_timings) => {
733 let result_with_timings = scheduler.transition_from_stale_to_unavailable();
734 (true, Some(result_with_timings))
735 }
736 SchedulerStatus::Unavailable => (true, None),
737 };
738 debug!(
739 "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
740 bank.slot(),
741 reason,
742 was_noop,
743 result_with_timings.as_ref().map(|(result, _)| result),
744 thread::current(),
745 );
746 trace!(
747 "wait_for_scheduler_termination(result_with_timings: {:?})",
748 result_with_timings,
749 );
750
751 result_with_timings
752 }
753
754 fn drop_scheduler(&self) {
755 if thread::panicking() {
756 error!(
757 "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
758 self.bank.slot(),
759 );
760 return;
761 }
762
763 // There's no guarantee ResultWithTimings is available or not at all when being dropped.
764 if let Some(Err(err)) = self
765 .wait_for_completed_scheduler_from_drop()
766 .map(|(result, _timings)| result)
767 {
768 warn!(
769 "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
770 self.bank.slot(),
771 err,
772 );
773 }
774 }
775}
776
777impl Drop for BankWithSchedulerInner {
778 fn drop(&mut self) {
779 self.drop_scheduler();
780 }
781}
782
783impl Deref for BankWithScheduler {
784 type Target = Arc<Bank>;
785
786 fn deref(&self) -> &Self::Target {
787 &self.inner.bank
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use {
794 super::*,
795 crate::{
796 bank::test_utils::goto_end_of_slot_with_scheduler,
797 genesis_utils::{create_genesis_config, GenesisConfigInfo},
798 },
799 mockall::Sequence,
800 solana_system_transaction as system_transaction,
801 std::sync::Mutex,
802 };
803
804 fn setup_mocked_scheduler_with_extra(
805 bank: Arc<Bank>,
806 is_dropped_flags: impl Iterator<Item = bool>,
807 f: Option<impl Fn(&mut MockInstalledScheduler)>,
808 ) -> InstalledSchedulerBox {
809 let mut mock = MockInstalledScheduler::new();
810 let seq = Arc::new(Mutex::new(Sequence::new()));
811
812 mock.expect_context()
813 .times(1)
814 .in_sequence(&mut seq.lock().unwrap())
815 .return_const(SchedulingContext::for_verification(bank));
816
817 for wait_reason in is_dropped_flags {
818 let seq_cloned = seq.clone();
819 mock.expect_wait_for_termination()
820 .with(mockall::predicate::eq(wait_reason))
821 .times(1)
822 .in_sequence(&mut seq.lock().unwrap())
823 .returning(move |_| {
824 let mut mock_uninstalled = MockUninstalledScheduler::new();
825 mock_uninstalled
826 .expect_return_to_pool()
827 .times(1)
828 .in_sequence(&mut seq_cloned.lock().unwrap())
829 .returning(|| ());
830 (
831 (Ok(()), ExecuteTimings::default()),
832 Box::new(mock_uninstalled),
833 )
834 });
835 }
836
837 if let Some(f) = f {
838 f(&mut mock);
839 }
840
841 Box::new(mock)
842 }
843
844 fn setup_mocked_scheduler(
845 bank: Arc<Bank>,
846 is_dropped_flags: impl Iterator<Item = bool>,
847 ) -> InstalledSchedulerBox {
848 setup_mocked_scheduler_with_extra(
849 bank,
850 is_dropped_flags,
851 None::<fn(&mut MockInstalledScheduler) -> ()>,
852 )
853 }
854
855 #[test]
856 fn test_scheduler_normal_termination() {
857 solana_logger::setup();
858
859 let bank = Arc::new(Bank::default_for_tests());
860 let bank = BankWithScheduler::new(
861 bank.clone(),
862 Some(setup_mocked_scheduler(bank, [false].into_iter())),
863 );
864 assert!(bank.has_installed_scheduler());
865 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
866
867 // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
868 // returned.
869 assert!(!bank.has_installed_scheduler());
870 assert_matches!(bank.wait_for_completed_scheduler(), None);
871 }
872
873 #[test]
874 fn test_no_scheduler_termination() {
875 solana_logger::setup();
876
877 let bank = Arc::new(Bank::default_for_tests());
878 let bank = BankWithScheduler::new_without_scheduler(bank);
879
880 // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
881 assert!(!bank.has_installed_scheduler());
882 assert_matches!(bank.wait_for_completed_scheduler(), None);
883 }
884
885 #[test]
886 fn test_scheduler_termination_from_drop() {
887 solana_logger::setup();
888
889 let bank = Arc::new(Bank::default_for_tests());
890 let bank = BankWithScheduler::new(
891 bank.clone(),
892 Some(setup_mocked_scheduler(bank, [true].into_iter())),
893 );
894 drop(bank);
895 }
896
897 #[test]
898 fn test_scheduler_pause() {
899 solana_logger::setup();
900
901 let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
902 let bank = BankWithScheduler::new(
903 bank.clone(),
904 Some(setup_mocked_scheduler_with_extra(
905 bank,
906 [false].into_iter(),
907 Some(|mocked: &mut MockInstalledScheduler| {
908 mocked
909 .expect_pause_for_recent_blockhash()
910 .times(1)
911 .returning(|| ());
912 }),
913 )),
914 );
915 goto_end_of_slot_with_scheduler(&bank);
916 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
917 }
918
919 fn do_test_schedule_execution(should_succeed: bool) {
920 solana_logger::setup();
921
922 let GenesisConfigInfo {
923 genesis_config,
924 mint_keypair,
925 ..
926 } = create_genesis_config(10_000);
927 let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
928 &mint_keypair,
929 &solana_pubkey::new_rand(),
930 2,
931 genesis_config.hash(),
932 ));
933 let bank = Arc::new(Bank::new_for_tests(&genesis_config));
934 let mocked_scheduler = setup_mocked_scheduler_with_extra(
935 bank.clone(),
936 [true].into_iter(),
937 Some(|mocked: &mut MockInstalledScheduler| {
938 if should_succeed {
939 mocked
940 .expect_schedule_execution()
941 .times(1)
942 .returning(|_, _| Ok(()));
943 } else {
944 mocked
945 .expect_schedule_execution()
946 .times(1)
947 .returning(|_, _| Err(SchedulerAborted));
948 mocked
949 .expect_recover_error_after_abort()
950 .times(1)
951 .returning(|| TransactionError::InsufficientFundsForFee);
952 }
953 }),
954 );
955
956 let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
957 let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
958 if should_succeed {
959 assert_matches!(result, Ok(()));
960 } else {
961 assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
962 }
963 }
964
965 #[test]
966 fn test_schedule_execution_success() {
967 do_test_schedule_execution(true);
968 }
969
970 #[test]
971 fn test_schedule_execution_failure() {
972 do_test_schedule_execution(false);
973 }
974}