atlas_runtime/installed_scheduler_pool.rs
1//! Transaction processing glue code, mainly consisting of Object-safe traits
2//!
3//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
4//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
5//! execution. After use, the scheduler will be returned to the pool.
6//!
7//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
8//! executions and commits those results into the associated _bank_.
9//!
10//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
11//! parallel transaction processing and there are multiple independent schedulers inside a single
12//! instance of [InstalledSchedulerPool].
13//!
14//! Dynamic dispatch was inevitable due to the desire to piggyback on
15//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
16//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
17//! actual implementations provided by the dependent crate (`atlas-unified-scheduler-pool`, which
18//! in turn depends on `atlas-ledger`, which in turn depends on `atlas-runtime`), avoiding a
19//! cyclic dependency.
20//!
21//! See [InstalledScheduler] for visualized interaction.
22
23use {
24 crate::bank::Bank,
25 assert_matches::assert_matches,
26 log::*,
27 solana_clock::Slot,
28 solana_hash::Hash,
29 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
30 solana_svm_timings::ExecuteTimings,
31 solana_transaction::sanitized::SanitizedTransaction,
32 solana_transaction_error::{TransactionError, TransactionResult as Result},
33 solana_unified_scheduler_logic::SchedulingMode,
34 std::{
35 fmt::{self, Debug},
36 mem,
37 ops::Deref,
38 sync::{Arc, RwLock},
39 thread,
40 },
41};
42#[cfg(feature = "dev-context-only-utils")]
43use {mockall::automock, qualifier_attr::qualifiers};
44
45pub fn initialized_result_with_timings() -> ResultWithTimings {
46 (Ok(()), ExecuteTimings::default())
47}
48
49pub trait InstalledSchedulerPool: Send + Sync + Debug {
50 /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool
51 /// for a brand-new bank.
52 fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
53 self.take_resumed_scheduler(context, initialized_result_with_timings())
54 }
55
56 fn take_resumed_scheduler(
57 &self,
58 context: SchedulingContext,
59 result_with_timings: ResultWithTimings,
60 ) -> InstalledSchedulerBox;
61
62 /// Registers an opaque timeout listener.
63 ///
64 /// This method and the passed `struct` called [`TimeoutListener`] are very opaque by purpose.
65 /// Specifically, it doesn't provide any way to tell which listener is semantically associated
66 /// to which particular scheduler. That's because proper _unregistration_ is omitted at the
67 /// timing of scheduler returning to reduce latency of the normal block-verification code-path,
68 /// relying on eventual stale listener clean-up by `solScCleaner`.
69 fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
70
71 fn uninstalled_from_bank_forks(self: Arc<Self>);
72}
73
74#[derive(Debug)]
75pub struct SchedulerAborted;
76pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;
77
78pub struct TimeoutListener {
79 callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
80}
81
82impl TimeoutListener {
83 pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
84 Self {
85 callback: Box::new(f),
86 }
87 }
88
89 pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
90 (self.callback)(pool);
91 }
92}
93
94impl Debug for TimeoutListener {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 write!(f, "TimeoutListener({self:p})")
97 }
98}
99
100#[cfg_attr(doc, aquamarine::aquamarine)]
101/// Schedules, executes, and commits transactions under encapsulated implementation
102///
103/// The following chart illustrates the ownership/reference interaction between inter-dependent
104/// objects across crates:
105///
106/// ```mermaid
107/// graph TD
108/// Bank["Arc#lt;Bank#gt;"]
109///
110/// subgraph atlas-runtime[<span style="font-size: 70%">atlas-runtime</span>]
111/// BankForks;
112/// BankWithScheduler;
113/// Bank;
114/// LoadExecuteAndCommitTransactions([<span style="font-size: 67%">load_execute_and_commit_transactions#lpar;#rpar;</span>]);
115/// SchedulingContext;
116/// InstalledSchedulerPool{{InstalledSchedulerPool}};
117/// InstalledScheduler{{InstalledScheduler}};
118/// end
119///
120/// subgraph atlas-unified-scheduler-pool[<span style="font-size: 70%">atlas-unified-scheduler-pool</span>]
121/// SchedulerPool;
122/// PooledScheduler;
123/// ScheduleExecution(["schedule_execution()"]);
124/// end
125///
126/// subgraph atlas-ledger[<span style="font-size: 60%">atlas-ledger</span>]
127/// ExecuteBatch(["execute_batch()"]);
128/// end
129///
130/// ScheduleExecution -. calls .-> ExecuteBatch;
131/// BankWithScheduler -. dyn-calls .-> ScheduleExecution;
132/// ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
133/// linkStyle 0,1,2 stroke:gray,color:gray;
134///
135/// BankForks -- owns --> BankWithScheduler;
136/// BankForks -- owns --> InstalledSchedulerPool;
137/// BankWithScheduler -- refs --> Bank;
138/// BankWithScheduler -- owns --> InstalledScheduler;
139/// SchedulingContext -- refs --> Bank;
140/// InstalledScheduler -- owns --> SchedulingContext;
141///
142/// SchedulerPool -- owns --> PooledScheduler;
143/// SchedulerPool -. impls .-> InstalledSchedulerPool;
144/// PooledScheduler -. impls .-> InstalledScheduler;
145/// PooledScheduler -- refs --> SchedulerPool;
146/// ```
147#[cfg_attr(feature = "dev-context-only-utils", automock)]
148// suppress false clippy complaints arising from mockall-derive:
149// warning: `#[must_use]` has no effect when applied to a struct field
150#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))]
151pub trait InstalledScheduler: Send + Sync + Debug + 'static {
152 fn id(&self) -> SchedulerId;
153 fn context(&self) -> &SchedulingContext;
154
155 /// Schedule transaction for execution.
156 ///
157 /// This non-blocking function will return immediately without waiting for actual execution.
158 ///
159 /// Calling this is illegal as soon as `wait_for_termination()` is called. It would result in
160 /// fatal logic error.
161 ///
162 /// Note that the returned result indicates whether the scheduler has been aborted due to a
163 /// previously-scheduled bad transaction, which terminates further block verification. So,
164 /// almost always, the returned error isn't due to the merely scheduling of the current
165 /// transaction itself. At this point, calling this does nothing anymore while it's still safe
166 /// to do. As soon as notified, callers are expected to stop processing upcoming transactions
167 /// of the same `SchedulingContext` (i.e. same block). Internally, the aborted scheduler will
168 /// be disposed cleanly, not repooled, after `wait_for_termination()` is called like
169 /// not-aborted schedulers.
170 ///
171 /// Caller can acquire the error by calling a separate function called
172 /// `recover_error_after_abort()`, which requires `&mut self`, instead of `&self`. This
173 /// separation and the convoluted returned value semantics explained above are intentional to
174 /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the
175 /// cost of far slower error code-path while giving implementors increased flexibility by
176 /// having &mut.
177 fn schedule_execution(
178 &self,
179 transaction: RuntimeTransaction<SanitizedTransaction>,
180 index: usize,
181 ) -> ScheduleResult;
182
183 /// Return the error which caused the scheduler to abort.
184 ///
185 /// Note that this must not be called until it's observed that `schedule_execution()` has
186 /// returned `Err(SchedulerAborted)`. Violating this should `panic!()`.
187 ///
188 /// That said, calling this multiple times is completely acceptable after the error observation
189 /// from `schedule_execution()`. While it's not guaranteed, the same `.clone()`-ed errors of
190 /// the first bad transaction are usually returned across invocations.
191 fn recover_error_after_abort(&mut self) -> TransactionError;
192
193 /// Wait for a scheduler to terminate after processing.
194 ///
195 /// This function blocks the current thread while waiting for the scheduler to complete all of
196 /// the executions for the scheduled transactions and to return the finalized
197 /// `ResultWithTimings`. This function still blocks for short period of time even in the case
198 /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).
199 ///
200 /// Along with the result being returned, this function also makes the scheduler itself
201 /// uninstalled from the bank by transforming the consumed self.
202 ///
203 /// If no transaction is scheduled, the result and timing will be `Ok(())` and
204 /// `ExecuteTimings::default()` respectively.
205 fn wait_for_termination(
206 self: Box<Self>,
207 is_dropped: bool,
208 ) -> (ResultWithTimings, UninstalledSchedulerBox);
209
210 /// Pause a scheduler after processing to update bank's recent blockhash.
211 ///
212 /// This function blocks the current thread like wait_for_termination(). However, the scheduler
213 /// won't be consumed. This means the scheduler is responsible to retain the finalized
214 /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
215 /// later.
216 fn pause_for_recent_blockhash(&mut self);
217
218 /// Unpause a block production scheduler, immediately after it's taken from the scheduler pool.
219 ///
220 /// This is rather a special-purposed method. Such a scheduler is initially paused due to a
221 /// race condition between the poh thread and handler threads. So, it needs to be unpaused in
222 /// order to start processing transactions by calling this.
223 ///
224 /// # Panics
225 ///
226 /// Panics if called on a block verification scheduler.
227 fn unpause_after_taken(&self);
228}
229
230#[cfg_attr(feature = "dev-context-only-utils", automock)]
231pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
232 fn return_to_pool(self: Box<Self>);
233}
234
235pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
236pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
237
238pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
239
240pub type SchedulerId = u64;
241
242/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
243///
244/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
245/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
246/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
247/// execute all transactions for a given bank for block verification or production. A context is
248/// expected to be used by a particular scheduler only for that duration of the time and to be
249/// disposed by the scheduler. Then, the scheduler may work on different banks with new
250/// `SchedulingContext`s.
251///
252/// There's a special construction only used for scheduler preallocation, which has no bank. Panics
253/// will be triggered when tried to be used normally across code-base.
254#[derive(Clone, Debug)]
255pub struct SchedulingContext {
256 mode: SchedulingMode,
257 bank: Option<Arc<Bank>>,
258}
259
260impl SchedulingContext {
261 pub fn for_preallocation() -> Self {
262 Self {
263 mode: SchedulingMode::BlockProduction,
264 bank: None,
265 }
266 }
267
268 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
269 pub(crate) fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
270 Self {
271 mode,
272 bank: Some(bank),
273 }
274 }
275
276 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
277 fn for_verification(bank: Arc<Bank>) -> Self {
278 Self::new_with_mode(SchedulingMode::BlockVerification, bank)
279 }
280
281 #[cfg(feature = "dev-context-only-utils")]
282 pub fn for_production(bank: Arc<Bank>) -> Self {
283 Self::new_with_mode(SchedulingMode::BlockProduction, bank)
284 }
285
286 pub fn is_preallocated(&self) -> bool {
287 self.bank.is_none()
288 }
289
290 pub fn mode(&self) -> SchedulingMode {
291 self.mode
292 }
293
294 pub fn bank(&self) -> Option<&Arc<Bank>> {
295 self.bank.as_ref()
296 }
297
298 pub fn slot(&self) -> Option<Slot> {
299 self.bank.as_ref().map(|bank| bank.slot())
300 }
301}
302
303pub type ResultWithTimings = (Result<()>, ExecuteTimings);
304
305/// A hint from the bank about the reason the caller is waiting on its scheduler.
306#[derive(Debug, PartialEq, Eq, Clone, Copy)]
307enum WaitReason {
308 // The bank wants its scheduler to terminate after the completion of transaction execution, in
309 // order to freeze itself immediately thereafter. This is by far the most normal wait reason.
310 //
311 // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
312 // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
313 // infallible.
314 TerminatedToFreeze,
315 // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
316 // Drop::drop() is the caller.
317 DroppedFromBankForks,
318 // The bank wants its scheduler to pause after the completion without being returned to the
319 // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
320 // `ResultWithTimings` later.
321 PausedForRecentBlockhash,
322}
323
324impl WaitReason {
325 pub fn is_paused(&self) -> bool {
326 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
327 // decision to be made, should we add new variants like `PausedForFooBar`...
328 match self {
329 WaitReason::PausedForRecentBlockhash => true,
330 WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
331 }
332 }
333
334 pub fn is_dropped(&self) -> bool {
335 // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
336 // decision to be made, should we add new variants like `PausedForFooBar`...
337 match self {
338 WaitReason::DroppedFromBankForks => true,
339 WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
340 }
341 }
342}
343
344#[allow(clippy::large_enum_variant)]
345#[derive(Debug)]
346pub enum SchedulerStatus {
347 /// Unified scheduler is disabled or installed scheduler is consumed by
348 /// [`InstalledScheduler::wait_for_termination`]. Note that transition to [`Self::Unavailable`]
349 /// from {[`Self::Active`], [`Self::Stale`]} is one-way (i.e. one-time) unlike [`Self::Active`]
350 /// <=> [`Self::Stale`] below. Also, this variant is transiently used as a placeholder
351 /// internally when transitioning scheduler statuses, which isn't observable unless panic is
352 /// happening.
353 Unavailable,
354 /// Scheduler is installed into a bank; could be running or just be waiting for additional
355 /// transactions. This will be transitioned to [`Self::Stale`] after certain time (i.e.
356 /// `atlas_unified_scheduler_pool::DEFAULT_TIMEOUT_DURATION`) has passed if its bank hasn't
357 /// been frozen since installed.
358 Active(InstalledSchedulerBox),
359 /// Scheduler has yet to freeze its associated bank even after it's taken too long since
360 /// installed, resulting in returning the scheduler back to the pool. Later, this can
361 /// immediately (i.e. transparently) be transitioned to [`Self::Active`] as soon as there's new
362 /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is
363 /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make
364 /// the transition happen).
365 Stale(InstalledSchedulerPoolArc, ResultWithTimings),
366}
367
368impl SchedulerStatus {
369 fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
370 match scheduler {
371 Some(scheduler) => SchedulerStatus::Active(scheduler),
372 None => SchedulerStatus::Unavailable,
373 }
374 }
375
376 fn transition_from_stale_to_active(
377 &mut self,
378 f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
379 ) {
380 let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
381 panic!("transition to Active failed: {self:?}");
382 };
383 *self = Self::Active(f(pool, result_with_timings));
384 }
385
386 fn maybe_transition_from_active_to_stale(
387 &mut self,
388 f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
389 ) {
390 if !matches!(self, Self::Active(_scheduler)) {
391 return;
392 }
393 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
394 unreachable!("not active: {self:?}");
395 };
396 let (pool, result_with_timings) = f(scheduler);
397 *self = Self::Stale(pool, result_with_timings);
398 }
399
400 fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
401 let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
402 panic!("transition to Unavailable failed: {self:?}");
403 };
404 scheduler
405 }
406
407 fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
408 let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
409 panic!("transition to Unavailable failed: {self:?}");
410 };
411 result_with_timings
412 }
413
414 fn active_scheduler(&self) -> &InstalledSchedulerBox {
415 let SchedulerStatus::Active(active_scheduler) = self else {
416 panic!("not active: {self:?}");
417 };
418 active_scheduler
419 }
420}
421
422/// Very thin wrapper around Arc<Bank>
423///
424/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
425/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
426/// ::drop() by DropBankService, which receives Vec<BankWithScheduler> from BankForks::set_root()'s
427/// pruning, mostly matching to Arc<Bank>'s lifetime by piggybacking on the pruning.
428///
429/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
430/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
431/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
432/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
433/// used for `BankWithScheduler` across codebase.
434///
435/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
436/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
437/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
438/// unusual outside scheduler code-path)
439#[derive(Debug)]
440pub struct BankWithScheduler {
441 inner: Arc<BankWithSchedulerInner>,
442}
443
444#[derive(Debug)]
445pub struct BankWithSchedulerInner {
446 bank: Arc<Bank>,
447 scheduler: InstalledSchedulerRwLock,
448}
449pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
450
451impl BankWithScheduler {
452 /// Creates a new `BankWithScheduler` from bank and its associated scheduler.
453 ///
454 /// # Panics
455 ///
456 /// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler
457 /// preallocation.
458 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
459 pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
460 // Avoid the fatal situation in which bank is being associated with a scheduler associated
461 // to a different bank!
462 if let Some(bank_in_context) = scheduler
463 .as_ref()
464 .map(|scheduler| scheduler.context().bank().unwrap())
465 {
466 assert!(Arc::ptr_eq(&bank, bank_in_context));
467 }
468
469 Self {
470 inner: Arc::new(BankWithSchedulerInner {
471 bank,
472 scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
473 }),
474 }
475 }
476
477 pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
478 Self::new(bank, None)
479 }
480
481 pub fn clone_with_scheduler(&self) -> BankWithScheduler {
482 BankWithScheduler {
483 inner: self.inner.clone(),
484 }
485 }
486
487 pub fn clone_without_scheduler(&self) -> Arc<Bank> {
488 self.inner.bank.clone()
489 }
490
491 pub fn register_tick(&self, hash: &Hash) {
492 self.inner.bank.register_tick(hash, &self.inner.scheduler);
493 }
494
495 #[cfg(feature = "dev-context-only-utils")]
496 pub fn fill_bank_with_ticks_for_tests(&self) {
497 self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
498 }
499
500 pub fn has_installed_scheduler(&self) -> bool {
501 !matches!(
502 &*self.inner.scheduler.read().unwrap(),
503 SchedulerStatus::Unavailable
504 )
505 }
506
507 /// Schedule the transaction as long as the scheduler hasn't been aborted.
508 ///
509 /// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
510 /// return the error of prior scheduled transaction.
511 ///
512 /// Calling this will panic if the installed scheduler is Unavailable (the bank is
513 /// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
514 pub fn schedule_transaction_executions(
515 &self,
516 transactions_with_indexes: impl ExactSizeIterator<
517 Item = (RuntimeTransaction<SanitizedTransaction>, usize),
518 >,
519 ) -> Result<()> {
520 trace!(
521 "schedule_transaction_executions(): {} txs",
522 transactions_with_indexes.len()
523 );
524
525 let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
526 for (sanitized_transaction, index) in transactions_with_indexes {
527 scheduler.schedule_execution(sanitized_transaction, index)?;
528 }
529 Ok(())
530 });
531
532 if schedule_result.is_err() {
533 // This write lock isn't atomic with the above the read lock. So, another thread
534 // could have called .recover_error_after_abort() while we're literally stuck at
535 // the gaps of these locks (i.e. this comment in source code wise) under extreme
536 // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
537 // consideration in mind.
538 //
539 // Lastly, this non-atomic nature is intentional for optimizing the fast code-path
540 return Err(self.inner.retrieve_error_after_schedule_failure());
541 }
542
543 Ok(())
544 }
545
546 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
547 pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
548 self.inner.do_create_timeout_listener()
549 }
550
551 // take needless &mut only to communicate its semantic mutability to humans...
552 #[cfg(feature = "dev-context-only-utils")]
553 pub fn drop_scheduler(&mut self) {
554 self.inner.drop_scheduler();
555 }
556
557 pub fn unpause_new_block_production_scheduler(&self) {
558 if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() {
559 assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
560 scheduler.unpause_after_taken();
561 }
562 }
563
564 pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
565 let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
566 bank,
567 scheduler,
568 WaitReason::PausedForRecentBlockhash,
569 );
570 assert!(
571 maybe_result_with_timings.is_none(),
572 "Premature result was returned from scheduler after paused (slot: {})",
573 bank.slot(),
574 );
575 }
576
577 #[must_use]
578 pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
579 BankWithSchedulerInner::wait_for_scheduler_termination(
580 &self.inner.bank,
581 &self.inner.scheduler,
582 WaitReason::TerminatedToFreeze,
583 )
584 }
585
586 pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
587 RwLock::new(SchedulerStatus::Unavailable)
588 }
589}
590
591impl BankWithSchedulerInner {
592 fn with_active_scheduler(
593 self: &Arc<Self>,
594 f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
595 ) -> ScheduleResult {
596 let scheduler = self.scheduler.read().unwrap();
597 match &*scheduler {
598 SchedulerStatus::Active(scheduler) => {
599 // This is the fast path, needing single read-lock most of time.
600 f(scheduler)
601 }
602 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
603 trace!(
604 "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
605 self.bank.slot(),
606 );
607 Err(SchedulerAborted)
608 }
609 SchedulerStatus::Stale(pool, _result_with_timings) => {
610 let pool = pool.clone();
611 drop(scheduler);
612
613 // Schedulers can be stale only if its mode is block-verification. So,
614 // unconditional context construction for verification is okay here.
615 let context = SchedulingContext::for_verification(self.bank.clone());
616 let mut scheduler = self.scheduler.write().unwrap();
617 trace!("with_active_scheduler: {scheduler:?}");
618 scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
619 let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
620 info!(
621 "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: \
622 {})",
623 self.bank.slot(),
624 scheduler.id(),
625 );
626 scheduler
627 });
628 drop(scheduler);
629
630 let scheduler = self.scheduler.read().unwrap();
631 // Re-register a new timeout listener only after acquiring the read lock;
632 // Otherwise, the listener would again put scheduler into Stale before the read
633 // lock under an extremely-rare race condition, causing panic below in
634 // active_scheduler().
635 pool.register_timeout_listener(self.do_create_timeout_listener());
636 f(scheduler.active_scheduler())
637 }
638 SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
639 }
640 }
641
642 fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
643 let weak_bank = Arc::downgrade(self);
644 TimeoutListener::new(move |pool| {
645 let Some(bank) = weak_bank.upgrade() else {
646 // BankWithSchedulerInner is already dropped, indicating successful and timely
647 // `wait_for_termination()` on the bank prior to this triggering of the timeout,
648 // rendering this callback invocation no-op.
649 return;
650 };
651
652 let Ok(mut scheduler) = bank.scheduler.write() else {
653 // BankWithScheduler's lock is poisoned...
654 return;
655 };
656
657 // Reaching here means that it's been awhile since this active scheduler is taken from
658 // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded
659 // thread creation under forky condition, return the scheduler for now, even if the
660 // bank could process more transactions later.
661 scheduler.maybe_transition_from_active_to_stale(|scheduler| {
662 // Return the installed scheduler back to the scheduler pool as soon as the
663 // scheduler indicates the completion of all currently-scheduled transaction
664 // executions by `atlas_unified_scheduler_pool::ThreadManager::end_session()`
665 // internally.
666
667 let id = scheduler.id();
668 let (result_with_timings, uninstalled_scheduler) =
669 scheduler.wait_for_termination(false);
670 uninstalled_scheduler.return_to_pool();
671 info!(
672 "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
673 bank.bank.slot(),
674 id,
675 );
676 (pool, result_with_timings)
677 });
678 trace!("timeout_listener: {scheduler:?}");
679 })
680 }
681
682 /// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
683 /// `panic!()`.
684 fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
685 let mut scheduler = self.scheduler.write().unwrap();
686 match &mut *scheduler {
687 SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
688 SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
689 result.clone().unwrap_err()
690 }
691 _ => unreachable!("no error in {:?}", self.scheduler),
692 }
693 }
694
695 #[must_use]
696 fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
697 Self::wait_for_scheduler_termination(
698 &self.bank,
699 &self.scheduler,
700 WaitReason::DroppedFromBankForks,
701 )
702 }
703
704 #[must_use]
705 fn wait_for_scheduler_termination(
706 bank: &Bank,
707 scheduler: &InstalledSchedulerRwLock,
708 reason: WaitReason,
709 ) -> Option<ResultWithTimings> {
710 debug!(
711 "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
712 bank.slot(),
713 reason,
714 thread::current(),
715 );
716
717 let mut scheduler = scheduler.write().unwrap();
718 let (was_noop, result_with_timings) = match &mut *scheduler {
719 SchedulerStatus::Active(scheduler) if reason.is_paused() => {
720 scheduler.pause_for_recent_blockhash();
721 (false, None)
722 }
723 SchedulerStatus::Active(_scheduler) => {
724 let scheduler = scheduler.transition_from_active_to_unavailable();
725 let (result_with_timings, uninstalled_scheduler) =
726 scheduler.wait_for_termination(reason.is_dropped());
727 uninstalled_scheduler.return_to_pool();
728 (false, Some(result_with_timings))
729 }
730 SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => {
731 // Do nothing for pauses because the scheduler termination is guaranteed to be
732 // called later.
733 (true, None)
734 }
735 SchedulerStatus::Stale(_pool, _result_with_timings) => {
736 let result_with_timings = scheduler.transition_from_stale_to_unavailable();
737 (true, Some(result_with_timings))
738 }
739 SchedulerStatus::Unavailable => (true, None),
740 };
741 debug!(
742 "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at \
743 {:?}...",
744 bank.slot(),
745 reason,
746 was_noop,
747 result_with_timings.as_ref().map(|(result, _)| result),
748 thread::current(),
749 );
750 trace!("wait_for_scheduler_termination(result_with_timings: {result_with_timings:?})",);
751
752 result_with_timings
753 }
754
755 fn drop_scheduler(&self) {
756 if thread::panicking() {
757 error!(
758 "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already \
759 panicking...",
760 self.bank.slot(),
761 );
762 return;
763 }
764
765 // There's no guarantee ResultWithTimings is available or not at all when being dropped.
766 if let Some(Err(err)) = self
767 .wait_for_completed_scheduler_from_drop()
768 .map(|(result, _timings)| result)
769 {
770 warn!(
771 "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from \
772 scheduler: {:?}",
773 self.bank.slot(),
774 err,
775 );
776 }
777 }
778}
779
780impl Drop for BankWithSchedulerInner {
781 fn drop(&mut self) {
782 self.drop_scheduler();
783 }
784}
785
786impl Deref for BankWithScheduler {
787 type Target = Arc<Bank>;
788
789 fn deref(&self) -> &Self::Target {
790 &self.inner.bank
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use {
797 super::*,
798 crate::{
799 bank::test_utils::goto_end_of_slot_with_scheduler,
800 genesis_utils::{create_genesis_config, GenesisConfigInfo},
801 },
802 mockall::Sequence,
803 solana_system_transaction as system_transaction,
804 std::sync::Mutex,
805 };
806
807 fn setup_mocked_scheduler_with_extra(
808 bank: Arc<Bank>,
809 is_dropped_flags: impl Iterator<Item = bool>,
810 f: Option<impl Fn(&mut MockInstalledScheduler)>,
811 ) -> InstalledSchedulerBox {
812 let mut mock = MockInstalledScheduler::new();
813 let seq = Arc::new(Mutex::new(Sequence::new()));
814
815 mock.expect_context()
816 .times(1)
817 .in_sequence(&mut seq.lock().unwrap())
818 .return_const(SchedulingContext::for_verification(bank));
819
820 for wait_reason in is_dropped_flags {
821 let seq_cloned = seq.clone();
822 mock.expect_wait_for_termination()
823 .with(mockall::predicate::eq(wait_reason))
824 .times(1)
825 .in_sequence(&mut seq.lock().unwrap())
826 .returning(move |_| {
827 let mut mock_uninstalled = MockUninstalledScheduler::new();
828 mock_uninstalled
829 .expect_return_to_pool()
830 .times(1)
831 .in_sequence(&mut seq_cloned.lock().unwrap())
832 .returning(|| ());
833 (
834 (Ok(()), ExecuteTimings::default()),
835 Box::new(mock_uninstalled),
836 )
837 });
838 }
839
840 if let Some(f) = f {
841 f(&mut mock);
842 }
843
844 Box::new(mock)
845 }
846
847 fn setup_mocked_scheduler(
848 bank: Arc<Bank>,
849 is_dropped_flags: impl Iterator<Item = bool>,
850 ) -> InstalledSchedulerBox {
851 setup_mocked_scheduler_with_extra(
852 bank,
853 is_dropped_flags,
854 None::<fn(&mut MockInstalledScheduler) -> ()>,
855 )
856 }
857
858 #[test]
859 fn test_scheduler_normal_termination() {
860 solana_logger::setup();
861
862 let bank = Arc::new(Bank::default_for_tests());
863 let bank = BankWithScheduler::new(
864 bank.clone(),
865 Some(setup_mocked_scheduler(bank, [false].into_iter())),
866 );
867 assert!(bank.has_installed_scheduler());
868 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
869
870 // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
871 // returned.
872 assert!(!bank.has_installed_scheduler());
873 assert_matches!(bank.wait_for_completed_scheduler(), None);
874 }
875
876 #[test]
877 fn test_no_scheduler_termination() {
878 solana_logger::setup();
879
880 let bank = Arc::new(Bank::default_for_tests());
881 let bank = BankWithScheduler::new_without_scheduler(bank);
882
883 // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
884 assert!(!bank.has_installed_scheduler());
885 assert_matches!(bank.wait_for_completed_scheduler(), None);
886 }
887
888 #[test]
889 fn test_scheduler_termination_from_drop() {
890 solana_logger::setup();
891
892 let bank = Arc::new(Bank::default_for_tests());
893 let bank = BankWithScheduler::new(
894 bank.clone(),
895 Some(setup_mocked_scheduler(bank, [true].into_iter())),
896 );
897 drop(bank);
898 }
899
900 #[test]
901 fn test_scheduler_pause() {
902 solana_logger::setup();
903
904 let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
905 let bank = BankWithScheduler::new(
906 bank.clone(),
907 Some(setup_mocked_scheduler_with_extra(
908 bank,
909 [false].into_iter(),
910 Some(|mocked: &mut MockInstalledScheduler| {
911 mocked
912 .expect_pause_for_recent_blockhash()
913 .times(1)
914 .returning(|| ());
915 }),
916 )),
917 );
918 goto_end_of_slot_with_scheduler(&bank);
919 assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
920 }
921
922 fn do_test_schedule_execution(should_succeed: bool) {
923 solana_logger::setup();
924
925 let GenesisConfigInfo {
926 genesis_config,
927 mint_keypair,
928 ..
929 } = create_genesis_config(10_000);
930 let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
931 &mint_keypair,
932 &solana_pubkey::new_rand(),
933 2,
934 genesis_config.hash(),
935 ));
936 let bank = Arc::new(Bank::new_for_tests(&genesis_config));
937 let mocked_scheduler = setup_mocked_scheduler_with_extra(
938 bank.clone(),
939 [true].into_iter(),
940 Some(|mocked: &mut MockInstalledScheduler| {
941 if should_succeed {
942 mocked
943 .expect_schedule_execution()
944 .times(1)
945 .returning(|_, _| Ok(()));
946 } else {
947 mocked
948 .expect_schedule_execution()
949 .times(1)
950 .returning(|_, _| Err(SchedulerAborted));
951 mocked
952 .expect_recover_error_after_abort()
953 .times(1)
954 .returning(|| TransactionError::InsufficientFundsForFee);
955 }
956 }),
957 );
958
959 let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
960 let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter());
961 if should_succeed {
962 assert_matches!(result, Ok(()));
963 } else {
964 assert_matches!(result, Err(TransactionError::InsufficientFundsForFee));
965 }
966 }
967
968 #[test]
969 fn test_schedule_execution_success() {
970 do_test_schedule_execution(true);
971 }
972
973 #[test]
974 fn test_schedule_execution_failure() {
975 do_test_schedule_execution(false);
976 }
977}