solana_unified_scheduler_logic/
lib.rs

1#![cfg_attr(
2    not(feature = "agave-unstable-api"),
3    deprecated(
4        since = "3.1.0",
5        note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6                v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7                acknowledge use of an interface that may break without warning."
8    )
9)]
10#![allow(rustdoc::private_intra_doc_links)]
11//! The task (transaction) scheduling code for the unified scheduler
12//!
13//! ### High-level API and design
14//!
15//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactions) and
16//! may return back them if runnable via
17//! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account
18//! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to
19//! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion
20//! of the execution via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that
21//! conflicting tasks can be returned from
22//! [`::schedule_next_unblocked_task()`](SchedulingStateMachine::schedule_next_unblocked_task) as
23//! newly-unblocked runnable ones.
24//!
25//! The design principle of this crate (`solana-unified-scheduler-logic`) is simplicity for the
26//! separation of concern. It is interacted only with a few of its public API by
27//! `solana-unified-scheduler-pool`. This crate doesn't know about banks, slots, solana-runtime,
28//! threads, crossbeam-channel at all. Because of this, it's deterministic, easy-to-unit-test, and
29//! its perf footprint is well understood. It really focuses on its single job: sorting
30//! transactions in executable order.
31//!
32//! ### Algorithm
33//!
34//! The algorithm can be said it's based on per-address FIFO queues, which are updated every time
35//! both new task is coming (= called _scheduling_) and runnable (= _post-scheduling_) task is
36//! finished (= called _descheduling_).
37//!
38//! For the _non-conflicting scheduling_ case, the story is very simple; it just remembers that all
39//! of accessed addresses are write-locked or read-locked with the number of active (=
40//! _currently-scheduled-and-not-descheduled-yet_) tasks. Correspondingly, descheduling does the
41//! opposite book-keeping process, regardless whether a finished task has been conflicted or not.
42//!
43//! For the _conflicting scheduling_ case, it remembers that each of **non-conflicting addresses**
44//! like the non-conflicting case above. As for **conflicting addresses**, each task is recorded to
45//! respective FIFO queues attached to the (conflicting) addresses. Importantly, the number of
46//! conflicting addresses of the conflicting task is also remembered.
47//!
48//! The last missing piece is that the scheduler actually tries to reschedule previously blocked
49//! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely,
50//! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write
51//! lock is released or read lock count has reached zero), it pops out the first element of the
52//! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It
53//! also decrements the number of conflicting addresses of the popped-out task. As the final step,
54//! if the number reaches to the zero, it means the task has fully finished locking all of its
55//! addresses and is directly routed to be runnable. Lastly, if the next first element of the
56//! blocked-task queue is trying to read-lock the address like the popped-out one, this
57//! rescheduling is repeated as an optimization to increase parallelism of task execution.
58//!
59//! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different
60//! timings while not deviating the execution order from the original task ingestion order. This
61//! implies there's no locking retries in general, which is the primary source of non-linear perf.
62//! degradation.
63//!
64//! As a ballpark number from a synthesized micro benchmark on usual CPU for `mainnet-beta`
65//! validators, it takes roughly 100ns to schedule and deschedule a transaction with 10 accounts.
66//! And 1us for a transaction with 100 accounts. Note that this excludes crossbeam communication
67//! overhead at all. That's said, it's not unrealistic to say the whole unified scheduler can
68//! attain 100k-1m tps overall, assuming those transaction executions aren't bottlenecked.
69//!
70//! ### Runtime performance characteristics and data structure arrangement
71//!
72//! Its algorithm is very fast for high throughput, real-time for low latency. The whole
73//! unified-scheduler architecture is designed from grounds up to support the fastest execution of
74//! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state
75//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload
76//! the job to other threads from the scheduler thread. This preloading is done inside
77//! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling
78//! computational complexity is basically reduced to several word-sized loads and stores in the
79//! scheduler thread (i.e.  constant; no allocations nor syscalls), while being proportional to the
80//! number of addresses in a given transaction. Note that this statement is held true, regardless
81//! of conflicts. This is because the preloading also pre-allocates some scratch-pad area
82//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked
83//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error
84//! margin of the constant complexity. And additional memory allocation for the scratchpad could
85//! said to be amortized, if such an unusual event should occur.
86//!
87//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across
88//! tasks accessing the same account, and among threads due to the preloading. Also, interior
89//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like
90//! RwLock.  Leveraging the fact it's the only state-mutating exclusive thread, it instead uses
91//! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell`
92//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety.
93//! By localizing any synchronization to the message passing, the scheduling code itself attains
94//! maximally possible single-threaed execution without stalling cpu pipelines at all, only
95//! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of
96//! `UsageQueue`s.
97//!
98//! ### Buffer bloat insignificance
99//!
100//! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in
101//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely
102//! hampered by very large number of interleaved runnable tasks along side.  The reason is again
103//! for separation of concerns. This is acceptable because the scheduling code itself isn't
104//! susceptible to the buffer bloat problem by itself as explained by the description and validated
105//! by the mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the
106//! scheduler pool.
107use {
108    crate::utils::{ShortCounter, Token, TokenCell},
109    assert_matches::assert_matches,
110    solana_pubkey::Pubkey,
111    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
112    solana_transaction::sanitized::SanitizedTransaction,
113    static_assertions::const_assert_eq,
114    std::{
115        cmp::Ordering,
116        collections::{BTreeMap, VecDeque},
117        mem,
118        sync::Arc,
119    },
120    unwrap_none::UnwrapNone,
121};
122
123#[derive(Clone, Copy, Debug, PartialEq)]
124pub enum SchedulingMode {
125    BlockVerification,
126    BlockProduction,
127}
128
129#[derive(Debug)]
130pub enum Capability {
131    /// Basic capability of simple fifo queueing. This is intended for block verification.
132    FifoQueueing,
133    /// Strictly superset capability of priority queueing with reordering of tasks by task_id.
134    /// This is intended for block production
135    /// In other words, any use of FifoQueueing can safely replaced with PriorityQueueing, just
136    /// being slower due to use of more expensive collections.
137    PriorityQueueing,
138}
139
140/// This type alias is intentionally not exposed to public API with `pub`. The choice of explicit
141/// `u32`, rather than more neutral `usize`, is an implementation detail to squeeze out CPU-cache
142/// footprint as much as possible.
143/// Note that usage of `u32` is safe because it's expected `SchedulingStateMachine` to be
144/// `reinitialize()`-d rather quickly after short period: 1 slot for block verification, 4 (or up to
145/// 8) consecutive slots for block production.
146type CounterInner = u32;
147
148pub type OrderedTaskId = u128;
149
150/// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`].
151mod utils {
152    use {
153        crate::CounterInner,
154        std::{
155            any::{self, TypeId},
156            cell::{RefCell, UnsafeCell},
157            collections::BTreeSet,
158            marker::PhantomData,
159            thread,
160        },
161    };
162
163    /// A really tiny counter to hide `.checked_{add,sub}` all over the place.
164    ///
165    /// It's caller's responsibility to ensure this (backed by [`CounterInner`]) never overflow.
166    #[derive(Debug, Clone, Copy)]
167    pub(super) struct ShortCounter(CounterInner);
168
169    impl ShortCounter {
170        pub(super) fn zero() -> Self {
171            Self(0)
172        }
173
174        pub(super) fn one() -> Self {
175            Self(1)
176        }
177
178        pub(super) fn is_one(&self) -> bool {
179            self.0 == 1
180        }
181
182        pub(super) fn is_zero(&self) -> bool {
183            self.0 == 0
184        }
185
186        pub(super) fn current(&self) -> CounterInner {
187            self.0
188        }
189
190        #[must_use]
191        pub(super) fn increment(self) -> Self {
192            Self(self.0.checked_add(1).unwrap())
193        }
194
195        #[must_use]
196        pub(super) fn decrement(self) -> Self {
197            Self(self.0.checked_sub(1).unwrap())
198        }
199
200        pub(super) fn increment_self(&mut self) -> &mut Self {
201            *self = self.increment();
202            self
203        }
204
205        pub(super) fn decrement_self(&mut self) -> &mut Self {
206            *self = self.decrement();
207            self
208        }
209
210        pub(super) fn reset_to_zero(&mut self) -> &mut Self {
211            self.0 = 0;
212            self
213        }
214    }
215
216    /// A conditionally [`Send`]-able and [`Sync`]-able cell leveraging scheduler's one-by-one data
217    /// access pattern with zero runtime synchronization cost.
218    ///
219    /// To comply with Rust's aliasing rules, these cells require a carefully-created [`Token`] to
220    /// be passed around to access the inner values. The token is a special-purpose phantom object
221    /// to get rid of its inherent `unsafe`-ness in [`UnsafeCell`], which is internally used for
222    /// the interior mutability.
223    ///
224    /// The final objective of [`Token`] is to ensure there's only one mutable reference to the
225    /// [`TokenCell`] at most _at any given moment_. To that end, it's `unsafe` to create it,
226    /// shifting the responsibility of binding the only singleton instance to a particular thread
227    /// and not creating more than one, onto the API consumers. And its constructor is non-`const`,
228    /// and the type is `!Clone` (and `!Copy` as well), `!Default`, `!Send` and `!Sync` to make it
229    /// relatively hard to cross thread boundaries accidentally.
230    ///
231    /// In other words, the token semantically _owns_ all of its associated instances of
232    /// [`TokenCell`]s. And `&mut Token` is needed to access one of them as if the one is of
233    /// [`Token`]'s `*_mut()` getters. Thus, the Rust aliasing rule for `UnsafeCell` can
234    /// transitively be proven to be satisfied simply based on the usual borrow checking of the
235    /// `&mut` reference of [`Token`] itself via
236    /// [`::with_borrow_mut()`](TokenCell::with_borrow_mut).
237    ///
238    /// By extension, it's allowed to create _multiple_ tokens in a _single_ process as long as no
239    /// instance of [`TokenCell`] is shared by multiple instances of [`Token`].
240    ///
241    /// Note that this is overly restrictive in that it's forbidden, yet, technically possible
242    /// to _have multiple mutable references to the inner values at the same time, if and only
243    /// if the respective cells aren't aliased to each other (i.e. different instances)_. This
244    /// artificial restriction is acceptable for its intended use by the unified scheduler's code
245    /// because its algorithm only needs to access each instance of [`TokenCell`]-ed data once at a
246    /// time. Finally, this restriction is traded off for restoration of Rust aliasing rule at zero
247    /// runtime cost.  Without this token mechanism, there's no way to realize this.
248    #[derive(Debug, Default)]
249    pub(super) struct TokenCell<V>(UnsafeCell<V>);
250
251    impl<V> TokenCell<V> {
252        /// Creates a new `TokenCell` with the `value` typed as `V`.
253        ///
254        /// Note that this isn't parametric over the its accompanied `Token`'s lifetime to avoid
255        /// complex handling of non-`'static` heaped data in general. Instead, it's manually
256        /// required to ensure this instance is accessed only via its associated Token for the
257        /// entire lifetime.
258        ///
259        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
260        /// variables among threads.
261        pub(super) fn new(value: V) -> Self {
262            Self(UnsafeCell::new(value))
263        }
264
265        /// Acquires a mutable reference inside a given closure, while borrowing the mutable
266        /// reference of the given token.
267        ///
268        /// In this way, any additional reborrow can never happen at the same time across all
269        /// instances of [`TokenCell<V>`] conceptually owned by the instance of [`Token<V>`] (a
270        /// particular thread), unless previous borrow is released. After the release, the used
271        /// singleton token should be free to be reused for reborrows.
272        ///
273        /// Note that lifetime of the acquired reference is still restricted to 'self, not
274        /// 'token, in order to avoid use-after-free undefined behaviors.
275        pub(super) fn with_borrow_mut<R>(
276            &self,
277            _token: &mut Token<V>,
278            f: impl FnOnce(&mut V) -> R,
279        ) -> R {
280            f(unsafe { &mut *self.0.get() })
281        }
282    }
283
284    // Safety: Once after a (`Send`-able) `TokenCell` is transferred to a thread from other
285    // threads, access to `TokenCell` is assumed to be only from the single thread by proper use of
286    // Token. Thereby, implementing `Sync` can be thought as safe and doing so is needed for the
287    // particular implementation pattern in the unified scheduler (multi-threaded off-loading).
288    //
289    // In other words, TokenCell is technically still `!Sync`. But there should be no
290    // legalized usage which depends on real `Sync` to avoid undefined behaviors.
291    unsafe impl<V> Sync for TokenCell<V> {}
292
293    /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system
294    ///
295    /// Token semantically owns a collection of `TokenCell` objects and governs the _unique_
296    /// existence of mutable access over them by requiring the token itself to be mutably borrowed
297    /// to get a mutable reference to the internal value of `TokenCell`.
298    // *mut is used to make this type !Send and !Sync
299    pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
300
301    impl<V> Token<V> {
302        /// Returns the token to acquire a mutable reference to the inner value of [TokenCell].
303        ///
304        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
305        /// variables among threads.
306        ///
307        /// # Panics
308        ///
309        /// This function will `panic!()` if called multiple times with same type `V` from the same
310        /// thread to detect potential misuses.
311        ///
312        /// # Safety
313        ///
314        /// This method should be called exactly once for each thread at most to avoid undefined
315        /// behavior when used with [`Token`].
316        #[must_use]
317        pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
318            thread_local! {
319                static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
320            }
321            // TOKEN.with_borrow_mut can't panic because it's the only non-overlapping
322            // bound-to-local-variable borrow of the _thread local_ variable.
323            assert!(
324                TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
325                "{:?} is wrongly initialized twice on {:?}",
326                any::type_name::<Self>(),
327                thread::current()
328            );
329
330            Self(PhantomData)
331        }
332    }
333
334    #[cfg(test)]
335    mod tests {
336        use {
337            super::{Token, TokenCell},
338            std::{mem, sync::Arc, thread},
339        };
340
341        #[test]
342        #[should_panic(
343            expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
344                        initialized twice on Thread"
345        )]
346        fn test_second_creation_of_tokens_in_a_thread() {
347            unsafe {
348                let _ = Token::<usize>::assume_exclusive_mutating_thread();
349                let _ = Token::<usize>::assume_exclusive_mutating_thread();
350            }
351        }
352
353        #[derive(Debug)]
354        struct FakeQueue {
355            v: Vec<u8>,
356        }
357
358        // As documented above, it's illegal to create multiple tokens inside a single thread to
359        // acquire multiple mutable references to the same TokenCell at the same time.
360        #[test]
361        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
362        // confirming false-positive result to conversely show the merit of miri!
363        #[cfg_attr(miri, ignore)]
364        fn test_ub_illegally_created_multiple_tokens() {
365            // Unauthorized token minting!
366            let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
367            let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
368
369            let queue = TokenCell::new(FakeQueue {
370                v: Vec::with_capacity(20),
371            });
372            queue.with_borrow_mut(&mut token1, |queue_mut1| {
373                queue_mut1.v.push(1);
374                queue.with_borrow_mut(&mut token2, |queue_mut2| {
375                    queue_mut2.v.push(2);
376                    queue_mut1.v.push(3);
377                });
378                queue_mut1.v.push(4);
379            });
380
381            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
382            #[cfg(not(miri))]
383            dbg!(queue.0.into_inner());
384
385            // Return successfully to indicate an unexpected outcome, because this test should
386            // have aborted by now.
387        }
388
389        // As documented above, it's illegal to share (= co-own) the same instance of TokenCell
390        // across threads. Unfortunately, we can't prevent this from happening with some
391        // type-safety magic to cause compile errors... So sanity-check here test fails due to a
392        // runtime error of the known UB, when run under miri.
393        #[test]
394        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
395        // confirming false-positive result to conversely show the merit of miri!
396        #[cfg_attr(miri, ignore)]
397        fn test_ub_illegally_shared_token_cell() {
398            let queue1 = Arc::new(TokenCell::new(FakeQueue {
399                v: Vec::with_capacity(20),
400            }));
401            let queue2 = queue1.clone();
402            #[cfg(not(miri))]
403            let queue3 = queue1.clone();
404
405            // Usually miri immediately detects the data race; but just repeat enough time to avoid
406            // being flaky
407            for _ in 0..10 {
408                let (queue1, queue2) = (queue1.clone(), queue2.clone());
409                let thread1 = thread::spawn(move || {
410                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
411                    queue1.with_borrow_mut(&mut token, |queue| {
412                        // this is UB
413                        queue.v.push(3);
414                    });
415                });
416                // Immediately spawn next thread without joining thread1 to ensure there's a data race
417                // definitely. Otherwise, joining here wouldn't cause UB.
418                let thread2 = thread::spawn(move || {
419                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
420                    queue2.with_borrow_mut(&mut token, |queue| {
421                        // this is UB
422                        queue.v.push(4);
423                    });
424                });
425
426                thread1.join().unwrap();
427                thread2.join().unwrap();
428            }
429
430            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
431            #[cfg(not(miri))]
432            {
433                drop((queue1, queue2));
434                dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
435            }
436
437            // Return successfully to indicate an unexpected outcome, because this test should
438            // have aborted by now
439        }
440    }
441}
442
443/// [`Result`] for locking a [usage_queue](UsageQueue) with particular
444/// [current_usage](RequestedUsage).
445type LockResult = Result<(), ()>;
446const_assert_eq!(mem::size_of::<LockResult>(), 1);
447
448/// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`].
449pub type Task = Arc<TaskInner>;
450const_assert_eq!(mem::size_of::<Task>(), 8);
451
452pub type BlockSize = usize;
453pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
454
455/// [`Token`] for [`UsageQueue`].
456type UsageQueueToken = Token<UsageQueueInner>;
457const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
458
459/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`).
460type BlockedUsageCountToken = Token<ShortCounter>;
461const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
462
463/// Internal scheduling data about a particular task.
464#[derive(Debug)]
465pub struct TaskInner {
466    transaction: RuntimeTransaction<SanitizedTransaction>,
467    /// For block verification, the index of a transaction in ledger entries. Carrying this along
468    /// with the transaction is needed to properly record the execution result of it.
469    /// For block production, the priority of a transaction for reordering with
470    /// Capability::PriorityQueueing. Note that the index of a transaction in ledger entries is
471    /// dynamically generated from the poh in the case of block production.
472    task_id: OrderedTaskId,
473    lock_contexts: Vec<LockContext>,
474    /// The number of remaining usages which are currently occupied by other tasks. In other words,
475    /// the task is said to be _blocked_ and needs to be _unblocked_ exactly this number of times
476    /// before running.
477    blocked_usage_count: TokenCell<ShortCounter>,
478    consumed_block_size: BlockSize,
479}
480
481impl TaskInner {
482    pub fn task_id(&self) -> OrderedTaskId {
483        self.task_id
484    }
485
486    pub fn is_higher_priority(&self, other: &Self) -> bool {
487        match self.task_id().cmp(&other.task_id()) {
488            Ordering::Less => true,
489            Ordering::Greater => false,
490            Ordering::Equal => panic!("self-compariton"),
491        }
492    }
493
494    pub fn consumed_block_size(&self) -> BlockSize {
495        self.consumed_block_size
496    }
497
498    pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
499        &self.transaction
500    }
501
502    fn lock_contexts(&self) -> &[LockContext] {
503        &self.lock_contexts
504    }
505
506    fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
507        self.blocked_usage_count
508            .with_borrow_mut(token, |usage_count| {
509                *usage_count = count;
510            })
511    }
512
513    /// Try to change the counter's state of this task towards the runnable state (called
514    /// _unblocking_), returning itself if finished completely.
515    ///
516    /// This should be called exactly once each time one of blocked usages of this task is newly
517    /// unlocked for proper book-keeping. Eventually, when this particular unblocking is determined
518    /// to be the last (i.e. [`Self::blocked_usage_count`] reaches to 0), this task should be run
519    /// by consuming the returned task itself properly.
520    #[must_use]
521    fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
522        let did_unblock = self
523            .blocked_usage_count
524            .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
525        did_unblock.then_some(self)
526    }
527
528    /// Try to change the counter's state of this task against the runnable state (called
529    /// _reblocking_), returning `true` if succeeded.
530    ///
531    /// This should be called with care to be consistent with usage queue's
532    /// [`blocked_usages_from_tasks`](UsageQueueInner::Priority::blocked_usages_from_tasks).
533    /// Blocked usage count of tasks are usually expected only to decrement over time by
534    /// [unblocking](Self::try_unblock). However, sometimes it's needed to do the opposite (
535    /// [`Capability::PriorityQueueing`]). In other words, previously successfully acquired usage
536    /// must be taken from a task to assign the usage to a even more higher-priority task. Note
537    /// that this can't be done if usage_count has already reached to 0, meaning it's possible for
538    /// it to be running already. In that case, this method returns `false` with no state change.
539    /// Otherwise, returns `true` after incrementing [`Self::blocked_usage_count`].
540    fn try_reblock(&self, token: &mut BlockedUsageCountToken) -> bool {
541        self.blocked_usage_count
542            .with_borrow_mut(token, |usage_count| {
543                if usage_count.is_zero() {
544                    false
545                } else {
546                    usage_count.increment_self();
547                    true
548                }
549            })
550    }
551
552    pub fn into_transaction(self: Task) -> RuntimeTransaction<SanitizedTransaction> {
553        Task::into_inner(self).unwrap().transaction
554    }
555}
556
557/// [`Task`]'s per-address context to lock a [usage_queue](UsageQueue) with [certain kind of
558/// request](RequestedUsage).
559#[derive(Debug)]
560struct LockContext {
561    usage_queue: UsageQueue,
562    requested_usage: RequestedUsage,
563}
564const_assert_eq!(mem::size_of::<LockContext>(), 16);
565
566impl LockContext {
567    fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
568        Self {
569            usage_queue,
570            requested_usage,
571        }
572    }
573
574    fn with_usage_queue_mut<R>(
575        &self,
576        usage_queue_token: &mut UsageQueueToken,
577        f: impl FnOnce(&mut UsageQueueInner) -> R,
578    ) -> R {
579        self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
580    }
581}
582
583/// Status about how the [`UsageQueue`] is used currently.
584#[derive(Copy, Clone, Debug)]
585enum Usage<R, W> {
586    Readonly(R),
587    Writable(W),
588}
589
590impl<R, W> Usage<R, W> {
591    fn requested_usage(&self) -> RequestedUsage {
592        match self {
593            Self::Readonly(_) => RequestedUsage::Readonly,
594            Self::Writable(_) => RequestedUsage::Writable,
595        }
596    }
597}
598
599type FifoUsage = Usage<ShortCounter, ()>;
600const_assert_eq!(mem::size_of::<FifoUsage>(), 8);
601
602// PriorityUsage will temporarily contain current tasks, unlike its very light-weight cousin (i.e.
603// FifoUsage). This arrangement is needed for reblocking (see the prepare_lock() method).
604//
605// Considering it may reside at usage_queue.current_usage, practically this means each addresses
606// (`Pubkey`s) may instantiate its own ones independently, exactly in the same manner as
607// usage_queue.blocked_usages_from_tasks.
608//
609// The reblocking algorithm involves the removal keyed by task_id on arbitrary-ordered unlocking
610// and the ranged query to handle reblocking of current readonly usages (if any). Currently,
611// BTreeMap is chosen mainly for its implementation simplicity and acceptable efficiency. This
612// might be replaced with more efficient implementation in the future.
613type PriorityUsage = Usage<BTreeMap<OrderedTaskId, Task>, Task>;
614const_assert_eq!(mem::size_of::<PriorityUsage>(), 32);
615
616impl From<RequestedUsage> for FifoUsage {
617    fn from(requested_usage: RequestedUsage) -> Self {
618        match requested_usage {
619            RequestedUsage::Readonly => Self::Readonly(ShortCounter::one()),
620            RequestedUsage::Writable => Self::Writable(()),
621        }
622    }
623}
624
625impl PriorityUsage {
626    fn from(task: Task, requested_usage: RequestedUsage) -> Self {
627        match requested_usage {
628            RequestedUsage::Readonly => Self::Readonly(BTreeMap::from([(task.task_id(), task)])),
629            RequestedUsage::Writable => Self::Writable(task),
630        }
631    }
632
633    fn take_readable(maybe_usage: &mut Option<Self>) {
634        let Some(Self::Readonly(tasks)) = maybe_usage.take() else {
635            panic!();
636        };
637        assert!(tasks.is_empty());
638    }
639
640    fn take_writable(maybe_usage: &mut Option<Self>) -> Task {
641        let Some(Self::Writable(task)) = maybe_usage.take() else {
642            panic!();
643        };
644        task
645    }
646}
647
648/// Status about how a task is requesting to use a particular [`UsageQueue`].
649#[derive(Clone, Copy, Debug)]
650enum RequestedUsage {
651    Readonly,
652    Writable,
653}
654
655// BTreeMap is needed for now for efficient manipulation...
656type PriorityUsageQueue = BTreeMap<OrderedTaskId, UsageFromTask>;
657
658trait PriorityUsageQueueExt: Sized {
659    fn insert_usage_from_task(&mut self, usage_from_task: UsageFromTask);
660    fn pop_first_usage_from_task(&mut self) -> Option<UsageFromTask>;
661    fn first_usage_from_task(&self) -> Option<&UsageFromTask>;
662}
663
664impl PriorityUsageQueueExt for PriorityUsageQueue {
665    fn insert_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
666        self.insert(usage_from_task.1.task_id(), usage_from_task)
667            .unwrap_none();
668    }
669
670    fn pop_first_usage_from_task(&mut self) -> Option<UsageFromTask> {
671        self.pop_first().map(|(_index, usage)| usage)
672    }
673
674    fn first_usage_from_task(&self) -> Option<&UsageFromTask> {
675        self.first_key_value().map(|(_index, usage)| usage)
676    }
677}
678
679/// Internal scheduling data about a particular address.
680///
681/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which
682/// [`Task`]s are blocked to be executed after the current task is notified to be finished via
683/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`)
684#[derive(Debug)]
685enum UsageQueueInner {
686    Fifo {
687        current_usage: Option<FifoUsage>,
688        blocked_usages_from_tasks: VecDeque<UsageFromTask>,
689    },
690    Priority {
691        current_usage: Option<PriorityUsage>,
692        blocked_usages_from_tasks: PriorityUsageQueue,
693    },
694}
695
696type UsageFromTask = (RequestedUsage, Task);
697
698impl UsageQueueInner {
699    fn with_fifo() -> Self {
700        Self::Fifo {
701            current_usage: None,
702            // Capacity should be configurable to create with large capacity like 1024 inside the
703            // (multi-threaded) closures passed to create_task(). In this way, reallocs can be
704            // avoided happening in the scheduler thread. Also, this configurability is desired for
705            // unified-scheduler-logic's motto: separation of concerns (the pure logic should be
706            // sufficiently distanced from any some random knob's constants needed for messy
707            // reality for author's personal preference...).
708            //
709            // Note that large cap should be accompanied with proper scheduler cleaning after use,
710            // which should be handled by higher layers (i.e. scheduler pool).
711            blocked_usages_from_tasks: VecDeque::with_capacity(128),
712        }
713    }
714
715    fn with_priority() -> Self {
716        Self::Priority {
717            current_usage: None,
718            // PriorityUsageQueue (i.e. BTreeMap) doesn't support capacity provisioning unlike
719            // VecDeque above. For efficient key-based lookup, BTreeMap can't usually be backed by
720            // some continuous provisioning-friendly collection (i.e. Vec). And, due to the need of
721            // those lookups by the current implementation, we can't use BinaryHeap and its family
722            // _for now_.
723            blocked_usages_from_tasks: PriorityUsageQueue::new(),
724        }
725    }
726
727    fn new(capability: &Capability) -> Self {
728        match capability {
729            Capability::FifoQueueing => Self::with_fifo(),
730            Capability::PriorityQueueing => Self::with_priority(),
731        }
732    }
733}
734
735impl UsageQueueInner {
736    fn try_lock(&mut self, new_task: &Task, requested_usage: RequestedUsage) -> LockResult {
737        match self {
738            Self::Fifo { current_usage, .. } => match current_usage {
739                None => Ok(FifoUsage::from(requested_usage)),
740                Some(FifoUsage::Readonly(count)) => match requested_usage {
741                    RequestedUsage::Readonly => Ok(FifoUsage::Readonly(count.increment())),
742                    RequestedUsage::Writable => Err(()),
743                },
744                Some(FifoUsage::Writable(())) => Err(()),
745            }
746            .map(|new_usage| {
747                *current_usage = Some(new_usage);
748            }),
749            Self::Priority { current_usage, .. } => match current_usage {
750                Some(PriorityUsage::Readonly(tasks)) => match requested_usage {
751                    RequestedUsage::Readonly => {
752                        tasks
753                            .insert(new_task.task_id(), new_task.clone())
754                            .unwrap_none();
755                        Ok(())
756                    }
757                    RequestedUsage::Writable => Err(()),
758                },
759                Some(PriorityUsage::Writable(_task)) => Err(()),
760                None => {
761                    *current_usage = Some(PriorityUsage::from(new_task.clone(), requested_usage));
762
763                    Ok(())
764                }
765            },
766        }
767    }
768
769    #[must_use]
770    fn unlock(&mut self, task: &Task, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
771        let mut is_newly_lockable = false;
772        match self {
773            Self::Fifo { current_usage, .. } => {
774                match current_usage {
775                    Some(FifoUsage::Readonly(ref mut count)) => match requested_usage {
776                        RequestedUsage::Readonly => {
777                            if count.is_one() {
778                                is_newly_lockable = true;
779                            } else {
780                                count.decrement_self();
781                            }
782                        }
783                        RequestedUsage::Writable => unreachable!(),
784                    },
785                    Some(FifoUsage::Writable(())) => {
786                        assert_matches!(requested_usage, RequestedUsage::Writable);
787                        is_newly_lockable = true;
788                    }
789                    None => unreachable!(),
790                }
791                if is_newly_lockable {
792                    *current_usage = None;
793                }
794            }
795            Self::Priority { current_usage, .. } => {
796                match current_usage {
797                    Some(PriorityUsage::Readonly(tasks)) => match requested_usage {
798                        RequestedUsage::Readonly => {
799                            // Don't skip remove()-ing to assert the existence of the last task.
800                            tasks.remove(&task.task_id()).unwrap();
801                            if tasks.is_empty() {
802                                is_newly_lockable = true;
803                            }
804                        }
805                        RequestedUsage::Writable => unreachable!(),
806                    },
807                    Some(PriorityUsage::Writable(_task)) => {
808                        assert_matches!(requested_usage, RequestedUsage::Writable);
809                        is_newly_lockable = true;
810                    }
811                    None => unreachable!(),
812                }
813                if is_newly_lockable {
814                    *current_usage = None;
815                }
816            }
817        }
818
819        if is_newly_lockable {
820            self.pop()
821        } else {
822            None
823        }
824    }
825
826    fn push_blocked(&mut self, usage_from_task: UsageFromTask) {
827        assert_matches!(self.current_usage(), Some(_));
828        self.push(usage_from_task);
829    }
830
831    #[must_use]
832    fn pop_lockable_readonly(&mut self) -> Option<UsageFromTask> {
833        if matches!(self.peek_blocked(), Some((RequestedUsage::Readonly, _))) {
834            assert_matches!(self.current_usage(), Some(RequestedUsage::Readonly));
835            self.pop()
836        } else {
837            None
838        }
839    }
840
841    fn current_usage(&self) -> Option<RequestedUsage> {
842        match self {
843            Self::Fifo { current_usage, .. } => {
844                current_usage.as_ref().map(|usage| usage.requested_usage())
845            }
846            Self::Priority { current_usage, .. } => {
847                current_usage.as_ref().map(|usage| usage.requested_usage())
848            }
849        }
850    }
851
852    #[cfg(test)]
853    fn update_current_usage(&mut self, requested_usage: RequestedUsage, task: &Task) {
854        match self {
855            Self::Fifo { current_usage, .. } => {
856                *current_usage = Some(FifoUsage::from(requested_usage));
857            }
858            Self::Priority { current_usage, .. } => {
859                *current_usage = Some(PriorityUsage::from(task.clone(), requested_usage));
860            }
861        }
862    }
863
864    fn pop(&mut self) -> Option<UsageFromTask> {
865        match self {
866            Self::Fifo {
867                blocked_usages_from_tasks,
868                ..
869            } => blocked_usages_from_tasks.pop_front(),
870            Self::Priority {
871                blocked_usages_from_tasks,
872                ..
873            } => blocked_usages_from_tasks.pop_first_usage_from_task(),
874        }
875    }
876
877    fn push(&mut self, usage_from_task: UsageFromTask) {
878        match self {
879            Self::Fifo {
880                blocked_usages_from_tasks,
881                ..
882            } => blocked_usages_from_tasks.push_back(usage_from_task),
883            Self::Priority {
884                blocked_usages_from_tasks,
885                ..
886            } => blocked_usages_from_tasks.insert_usage_from_task(usage_from_task),
887        }
888    }
889
890    fn peek_blocked(&self) -> Option<&UsageFromTask> {
891        match self {
892            Self::Fifo {
893                blocked_usages_from_tasks,
894                ..
895            } => blocked_usages_from_tasks.front(),
896            Self::Priority {
897                blocked_usages_from_tasks,
898                ..
899            } => blocked_usages_from_tasks.first_usage_from_task(),
900        }
901    }
902
903    fn prepare_lock(
904        &mut self,
905        token: &mut BlockedUsageCountToken,
906        new_task: &Task,
907        requested_usage: RequestedUsage,
908    ) -> LockResult {
909        match self {
910            Self::Fifo {
911                blocked_usages_from_tasks,
912                ..
913            } => {
914                if blocked_usages_from_tasks.is_empty() {
915                    Ok(())
916                } else {
917                    Err(())
918                }
919            }
920            // This is the heart of our priority queue mechanism, called reblocking. Understanding
921            // it needs a bit of twist of thinking.
922            //
923            // First, recall that the entire logic of SchedulingStateMachine is about fifo
924            // queueing, whose state is ensured for completion with nice property of _bounded_
925            // computation of each ticks of state transition.
926            //
927            // This priority reordering want to exploit it. Namely, following code mangles the
928            // queue upon arrival of new higher-prioritized tasks and immediately before the actual
929            // locking (thus this fn is called `prepare_lock()`), _as if those tasks should have
930            // arrived earlier in the precise order of their task_ids. Then, all other code work
931            // nicely with priority ordering enabled.
932            //
933            // Note that reblocking must be consistently applied across all usage queues.
934            // otherwise, deadlock would happen.
935            Self::Priority {
936                current_usage,
937                blocked_usages_from_tasks,
938            } => {
939                // This artificial var is needed to pacify rust borrow checker...
940                let mut current_and_requested_usage = (current_usage, requested_usage);
941
942                match &mut current_and_requested_usage {
943                    (None, _) => {
944                        assert!(blocked_usages_from_tasks.is_empty());
945                        Ok(())
946                    }
947                    (Some(PriorityUsage::Writable(current_task)), _requested_usage) => {
948                        if !new_task.is_higher_priority(current_task)
949                            || !current_task.try_reblock(token)
950                        {
951                            return Err(());
952                        }
953                        let reblocked_task = Usage::take_writable(current_and_requested_usage.0);
954                        blocked_usages_from_tasks
955                            .insert_usage_from_task((RequestedUsage::Writable, reblocked_task));
956                        Ok(())
957                    }
958                    (Some(PriorityUsage::Readonly(_current_tasks)), RequestedUsage::Readonly) => {
959                        let Some((peeked_usage, peeked_task)) = self.peek_blocked() else {
960                            return Ok(());
961                        };
962
963                        // Current usage is Readonly. This means that the highest-priority
964                        // blocked task must be Writable. So, we assert this here as a
965                        // precaution. Note that peeked_usage must be Writable regardless requested
966                        // usage is Readonly or Writable.
967                        assert_matches!(peeked_usage, RequestedUsage::Writable);
968                        if !new_task.is_higher_priority(peeked_task) {
969                            return Err(());
970                        }
971                        Ok(())
972                    }
973                    (Some(PriorityUsage::Readonly(current_tasks)), RequestedUsage::Writable) => {
974                        // First, we need to determine whether the write-requesting new_task could
975                        // reblock current read-only tasks _very efficiently while bounded under
976                        // the worst case_, to prevent large number of low priority tasks from
977                        // consuming undue amount of cpu cycles for nothing.
978
979                        // Use extract_if once stablized to remove Vec creation and the repeating
980                        // remove()s...
981                        let task_indexes = current_tasks
982                            .range(new_task.task_id()..)
983                            .filter_map(|(&task_id, task)| {
984                                task.try_reblock(token).then_some(task_id)
985                            })
986                            .collect::<Vec<OrderedTaskId>>();
987                        for task_id in task_indexes.into_iter() {
988                            let reblocked_task = current_tasks.remove(&task_id).unwrap();
989                            blocked_usages_from_tasks
990                                .insert_usage_from_task((RequestedUsage::Readonly, reblocked_task));
991                        }
992
993                        if current_tasks.is_empty() {
994                            Usage::take_readable(current_and_requested_usage.0);
995                            Ok(())
996                        } else {
997                            // In this case, new_task will still be inserted as the
998                            // highest-priority blocked writable task, nevertheless any of readonly
999                            // tasks are reblocked above. That's because all of such tasks should
1000                            // be of lower-priority than new_task by the very `range()` lookup
1001                            // above. So, the write-always-follows-read critical invariant is still
1002                            // intact. So is the assertion in current-and-requested-readonly
1003                            // match arm.
1004                            Err(())
1005                        }
1006                    }
1007                }
1008            }
1009        }
1010    }
1011}
1012
1013const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 56);
1014
1015/// Scheduler's internal data for each address ([`Pubkey`](`solana_pubkey::Pubkey`)). Very
1016/// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and
1017/// [`::default()`](Default::default).
1018///
1019/// It's the higher layer's responsibility to ensure to associate the same instance of UsageQueue
1020/// for given Pubkey at the time of [task](Task) creation.
1021#[derive(Debug, Clone)]
1022pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
1023const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
1024
1025impl UsageQueue {
1026    pub fn new(capability: &Capability) -> Self {
1027        Self(Arc::new(TokenCell::new(UsageQueueInner::new(capability))))
1028    }
1029}
1030
1031/// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by
1032/// `solana-unified-scheduler-pool`.
1033pub struct SchedulingStateMachine {
1034    unblocked_task_queue: VecDeque<Task>,
1035    /// The number of all tasks which aren't `deschedule_task()`-ed yet while their ownership has
1036    /// already been transferred to SchedulingStateMachine by `schedule_or_buffer_task()`. In other
1037    /// words, they are running right now or buffered by explicit request or by implicit blocking
1038    /// due to one of other _active_ (= running or buffered) conflicting tasks.
1039    active_task_count: ShortCounter,
1040    /// The number of tasks which are running right now.
1041    running_task_count: ShortCounter,
1042    /// The maximum number of running tasks at any given moment. While this could be tightly
1043    /// related to the number of threads, terminology here is intentionally abstracted away to make
1044    /// this struct purely logic only. As a hypothetical counter-example, tasks could be IO-bound,
1045    /// in that case max_running_task_count will be coupled with the IO queue depth instead.
1046    max_running_task_count: CounterInner,
1047    handled_task_count: ShortCounter,
1048    unblocked_task_count: ShortCounter,
1049    total_task_count: ShortCounter,
1050    count_token: BlockedUsageCountToken,
1051    usage_queue_token: UsageQueueToken,
1052}
1053const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 56);
1054
1055impl SchedulingStateMachine {
1056    pub fn has_no_running_task(&self) -> bool {
1057        self.running_task_count.is_zero()
1058    }
1059
1060    pub fn has_no_active_task(&self) -> bool {
1061        self.active_task_count.is_zero()
1062    }
1063
1064    pub fn has_unblocked_task(&self) -> bool {
1065        !self.unblocked_task_queue.is_empty()
1066    }
1067
1068    pub fn has_runnable_task(&self) -> bool {
1069        self.has_unblocked_task() && self.is_task_runnable()
1070    }
1071
1072    fn is_task_runnable(&self) -> bool {
1073        self.running_task_count.current() < self.max_running_task_count
1074    }
1075
1076    pub fn unblocked_task_queue_count(&self) -> usize {
1077        self.unblocked_task_queue.len()
1078    }
1079
1080    #[cfg(test)]
1081    fn active_task_count(&self) -> CounterInner {
1082        self.active_task_count.current()
1083    }
1084
1085    #[cfg(test)]
1086    fn handled_task_count(&self) -> CounterInner {
1087        self.handled_task_count.current()
1088    }
1089
1090    #[cfg(test)]
1091    fn unblocked_task_count(&self) -> CounterInner {
1092        self.unblocked_task_count.current()
1093    }
1094
1095    #[cfg(test)]
1096    fn total_task_count(&self) -> CounterInner {
1097        self.total_task_count.current()
1098    }
1099
1100    /// Schedules given `task`, returning it if successful.
1101    ///
1102    /// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`,
1103    /// indicating the scheduled task is blocked currently.
1104    ///
1105    /// Note that this function takes ownership of the task to allow for future optimizations.
1106    #[cfg(any(test, doc))]
1107    #[must_use]
1108    pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
1109        self.schedule_or_buffer_task(task, false)
1110    }
1111
1112    /// Adds given `task` to internal buffer, even if it's immediately schedulable otherwise.
1113    ///
1114    /// Put differently, buffering means to force the task to be blocked unconditionally after
1115    /// normal scheduling processing.
1116    ///
1117    /// Thus, the task is internally retained inside this [`SchedulingStateMachine`], whether the
1118    /// task is blocked or not. Eventually, the buffered task will be returned by one of later
1119    /// invocations [`schedule_next_unblocked_task()`](Self::schedule_next_unblocked_task).
1120    ///
1121    /// Note that this function takes ownership of the task to allow for future optimizations.
1122    pub fn buffer_task(&mut self, task: Task) {
1123        self.schedule_or_buffer_task(task, true).unwrap_none();
1124    }
1125
1126    /// Schedules or buffers given `task`, returning successful one unless buffering is forced.
1127    ///
1128    /// Refer to [`schedule_task()`](Self::schedule_task) and
1129    /// [`buffer_task()`](Self::buffer_task) for the difference between _scheduling_ and
1130    /// _buffering_ respectively.
1131    ///
1132    /// Note that this function takes ownership of the task to allow for future optimizations.
1133    #[must_use]
1134    pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option<Task> {
1135        self.total_task_count.increment_self();
1136        self.active_task_count.increment_self();
1137        self.try_lock_usage_queues(task).and_then(|task| {
1138            // locking succeeded, and then ...
1139            if !self.is_task_runnable() || force_buffering {
1140                // ... push to unblocked_task_queue, if buffering is forced.
1141                self.unblocked_task_count.increment_self();
1142                self.unblocked_task_queue.push_back(task);
1143                None
1144            } else {
1145                // ... return the task back as schedulable to the caller as-is otherwise.
1146                self.running_task_count.increment_self();
1147                Some(task)
1148            }
1149        })
1150    }
1151
1152    #[must_use]
1153    pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
1154        if !self.is_task_runnable() {
1155            return None;
1156        }
1157
1158        self.unblocked_task_queue.pop_front().inspect(|_| {
1159            self.running_task_count.increment_self();
1160            self.unblocked_task_count.increment_self();
1161        })
1162    }
1163
1164    /// Deschedules given scheduled `task`.
1165    ///
1166    /// This must be called exactly once for all scheduled tasks to uphold both
1167    /// `SchedulingStateMachine` and `UsageQueue` internal state consistency at any given moment of
1168    /// time. It's serious logic error to call this twice with the same task or none at all after
1169    /// scheduling. Similarly, calling this with not scheduled task is also forbidden.
1170    ///
1171    /// Note that this function intentionally doesn't take ownership of the task to avoid dropping
1172    /// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
1173    /// opportunity for callers.
1174    pub fn deschedule_task(&mut self, task: &Task) {
1175        self.running_task_count.decrement_self();
1176        self.active_task_count.decrement_self();
1177        self.handled_task_count.increment_self();
1178        self.unlock_usage_queues(task);
1179    }
1180
1181    #[must_use]
1182    fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
1183        let mut blocked_usage_count = ShortCounter::zero();
1184
1185        for context in task.lock_contexts() {
1186            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
1187                let lock_result = usage_queue
1188                    .prepare_lock(&mut self.count_token, &task, context.requested_usage)
1189                    .and_then(|()| usage_queue.try_lock(&task, context.requested_usage));
1190                if let Err(()) = lock_result {
1191                    blocked_usage_count.increment_self();
1192                    let usage_from_task = (context.requested_usage, task.clone());
1193                    usage_queue.push_blocked(usage_from_task);
1194                }
1195            });
1196        }
1197
1198        // no blocked usage count means success
1199        if blocked_usage_count.is_zero() {
1200            Some(task)
1201        } else {
1202            task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
1203            None
1204        }
1205    }
1206
1207    fn unlock_usage_queues(&mut self, task: &Task) {
1208        for context in task.lock_contexts() {
1209            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
1210                let mut newly_lockable = usage_queue.unlock(task, context.requested_usage);
1211                while let Some((lockable_usage, lockable_task)) = newly_lockable {
1212                    usage_queue
1213                        .try_lock(&lockable_task, lockable_usage)
1214                        .unwrap();
1215
1216                    // When `try_unblock()` returns `None` as a failure of unblocking this time,
1217                    // this means the task is still blocked by other active task's usages. So,
1218                    // don't push task into unblocked_task_queue yet. It can be assumed that every
1219                    // task will eventually succeed to be unblocked, and enter in this condition
1220                    // clause as long as `SchedulingStateMachine` is used correctly.
1221                    if let Some(unblocked_task) = lockable_task.try_unblock(&mut self.count_token) {
1222                        self.unblocked_task_queue.push_back(unblocked_task);
1223                    }
1224
1225                    // Try to further schedule blocked task for parallelism in the case of readonly
1226                    // usages
1227                    newly_lockable = matches!(lockable_usage, RequestedUsage::Readonly)
1228                        .then(|| usage_queue.pop_lockable_readonly())
1229                        .flatten();
1230                }
1231            });
1232        }
1233    }
1234
1235    /// Creates a new task with [`RuntimeTransaction<SanitizedTransaction>`] with all of
1236    /// its corresponding [`UsageQueue`]s preloaded.
1237    ///
1238    /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded)
1239    /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the
1240    /// caller's responsibility to ensure the same instance is returned from the closure, given a
1241    /// particular pubkey.
1242    ///
1243    /// Closure is used here to delegate the responsibility of primary ownership of `UsageQueue`
1244    /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of
1245    /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just
1246    /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for
1247    /// separation of concern.
1248    pub fn create_task(
1249        transaction: RuntimeTransaction<SanitizedTransaction>,
1250        task_id: OrderedTaskId,
1251        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1252    ) -> Task {
1253        Self::do_create_task(
1254            transaction,
1255            task_id,
1256            NO_CONSUMED_BLOCK_SIZE,
1257            usage_queue_loader,
1258        )
1259    }
1260
1261    pub fn create_block_production_task(
1262        transaction: RuntimeTransaction<SanitizedTransaction>,
1263        task_id: OrderedTaskId,
1264        consumed_block_size: BlockSize,
1265        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1266    ) -> Task {
1267        Self::do_create_task(
1268            transaction,
1269            task_id,
1270            consumed_block_size,
1271            usage_queue_loader,
1272        )
1273    }
1274
1275    fn do_create_task(
1276        transaction: RuntimeTransaction<SanitizedTransaction>,
1277        task_id: OrderedTaskId,
1278        consumed_block_size: BlockSize,
1279        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1280    ) -> Task {
1281        // It's crucial for tasks to be validated with
1282        // `account_locks::validate_account_locks()` prior to the creation.
1283        // That's because it's part of protocol consensus regarding the
1284        // rejection of blocks containing malformed transactions
1285        // (`AccountLoadedTwice` and `TooManyAccountLocks`). Even more,
1286        // `SchedulingStateMachine` can't properly handle transactions with
1287        // duplicate addresses (those falling under `AccountLoadedTwice`).
1288        //
1289        // However, it's okay for now not to call `::validate_account_locks()`
1290        // here.
1291        //
1292        // Currently `replay_stage` is always calling
1293        //`::validate_account_locks()` regardless of whether unified-scheduler
1294        // is enabled or not at the blockstore
1295        // (`Bank::prepare_sanitized_batch()` is called in
1296        // `process_entries()`). This verification will be hoisted for
1297        // optimization when removing
1298        // `--block-verification-method=blockstore-processor`.
1299        //
1300        // As for `banking_stage` with unified scheduler, it will need to run
1301        // `validate_account_locks()` at least once somewhere in the code path.
1302        // In the distant future, this function (`create_task()`) should be
1303        // adjusted so that both stages do the checks before calling this or do
1304        // the checks here, to simplify the two code paths regarding the
1305        // essential `validate_account_locks` validation.
1306        //
1307        // Lastly, `validate_account_locks()` is currently called in
1308        // `DefaultTransactionHandler::handle()` via
1309        // `Bank::prepare_unlocked_batch_from_single_tx()` as well.
1310        // This redundancy is known. It was just left as-is out of abundance
1311        // of caution.
1312        let lock_contexts = transaction
1313            .message()
1314            .account_keys()
1315            .iter()
1316            .enumerate()
1317            .map(|(task_id, address)| {
1318                LockContext::new(
1319                    usage_queue_loader(*address),
1320                    if transaction.message().is_writable(task_id) {
1321                        RequestedUsage::Writable
1322                    } else {
1323                        RequestedUsage::Readonly
1324                    },
1325                )
1326            })
1327            .collect();
1328
1329        Task::new(TaskInner {
1330            transaction,
1331            task_id,
1332            lock_contexts,
1333            blocked_usage_count: TokenCell::new(ShortCounter::zero()),
1334            consumed_block_size,
1335        })
1336    }
1337
1338    /// Rewind the inactive state machine to be initialized
1339    ///
1340    /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
1341    /// This panics if the state machine hasn't properly been finished (i.e. there should be no
1342    /// active task) to uphold invariants of [`UsageQueue`]s.
1343    ///
1344    /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
1345    /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
1346    /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
1347    /// other slots.
1348    ///
1349    /// There's a related method called [`clear_and_reinitialize()`](Self::clear_and_reinitialize).
1350    pub fn reinitialize(&mut self) {
1351        assert!(self.has_no_active_task());
1352        assert_eq!(self.running_task_count.current(), 0);
1353        assert_eq!(self.unblocked_task_queue.len(), 0);
1354        // nice trick to ensure all fields are handled here if new one is added.
1355        let Self {
1356            unblocked_task_queue: _,
1357            active_task_count,
1358            running_task_count: _,
1359            max_running_task_count: _,
1360            handled_task_count,
1361            unblocked_task_count,
1362            total_task_count,
1363            count_token: _,
1364            usage_queue_token: _,
1365            // don't add ".." here
1366        } = self;
1367        active_task_count.reset_to_zero();
1368        handled_task_count.reset_to_zero();
1369        unblocked_task_count.reset_to_zero();
1370        total_task_count.reset_to_zero();
1371    }
1372
1373    /// Clear all buffered tasks and immediately rewind the state machine to be initialized
1374    ///
1375    /// This method _may_ panic if there are tasks which has been scheduled but hasn't been
1376    /// descheduled yet (called active tasks). This is due to the invocation of
1377    /// [`reinitialize()`](Self::reinitialize) at last. On the other hand, it's guaranteed not to
1378    /// panic otherwise. That's because the first clearing step effectively relaxes the runtime
1379    /// invariant of `reinitialize()` by making the state machine _inactive_ beforehand. After a
1380    /// successful operation, this method returns the number of cleared tasks.
1381    ///
1382    /// Somewhat surprisingly, the clearing logic is same as the normal (de-)scheduling operation
1383    /// because it is still the fastest way to just clear all tasks, under the consideration of
1384    /// potential later use of [`UsageQueue`]s. That's because `state_machine` doesn't maintain _the
1385    /// global list_ of tasks. Maintaining such one would incur a needless overhead on scheduling,
1386    /// which isn't strictly needed otherwise.
1387    ///
1388    /// Moreover, the descheduling operation is rather heavily optimized to begin with. All
1389    /// collection ops are just O(1) over total N of addresses accessed by all active tasks with
1390    /// no amortized mem ops.
1391    ///
1392    /// Whatever the algorithm is chosen, the ultimate goal of this operation is to clear all usage
1393    /// queues. Toward to that end, one may create a temporary hash set over [`UsageQueue`]s on the
1394    /// fly alternatively. However, that would be costlier than the above usual descheduling
1395    /// approach due to extra mem ops and many lookups/insertions.
1396    pub fn clear_and_reinitialize(&mut self) -> usize {
1397        let mut count = ShortCounter::zero();
1398        while let Some(task) = self.schedule_next_unblocked_task() {
1399            self.deschedule_task(&task);
1400            count.increment_self();
1401        }
1402        self.reinitialize();
1403        count.current().try_into().unwrap()
1404    }
1405
1406    /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
1407    /// well, thus carrying over `unsafe`.
1408    ///
1409    /// # Safety
1410    /// Call this exactly once for each thread. See [`TokenCell`] for details.
1411    #[must_use]
1412    pub unsafe fn exclusively_initialize_current_thread_for_scheduling(
1413        max_running_task_count: Option<usize>,
1414    ) -> Self {
1415        // As documented at `CounterInner`, don't expose rather opinionated choice of unsigned
1416        // integer type (`u32`) to outer world. So, take more conventional `usize` and convert it
1417        // to `CounterInner` here while uncontroversially treating `None` as no limit effectively.
1418        let max_running_task_count = max_running_task_count
1419            .unwrap_or(CounterInner::MAX as usize)
1420            .try_into()
1421            .unwrap();
1422
1423        Self {
1424            // It's very unlikely this is desired to be configurable, like
1425            // `UsageQueueInner::blocked_usages_from_tasks`'s cap.
1426            unblocked_task_queue: VecDeque::with_capacity(1024),
1427            active_task_count: ShortCounter::zero(),
1428            running_task_count: ShortCounter::zero(),
1429            max_running_task_count,
1430            handled_task_count: ShortCounter::zero(),
1431            unblocked_task_count: ShortCounter::zero(),
1432            total_task_count: ShortCounter::zero(),
1433            count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
1434            usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
1435        }
1436    }
1437
1438    #[cfg(test)]
1439    unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self {
1440        Self::exclusively_initialize_current_thread_for_scheduling(None)
1441    }
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446    use {
1447        super::*,
1448        solana_instruction::{AccountMeta, Instruction},
1449        solana_message::Message,
1450        solana_pubkey::Pubkey,
1451        solana_transaction::{sanitized::SanitizedTransaction, Transaction},
1452        std::{
1453            cell::RefCell,
1454            collections::HashMap,
1455            panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
1456            rc::Rc,
1457        },
1458        test_case::test_matrix,
1459    };
1460
1461    fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
1462        let message = Message::new(&[], Some(&Pubkey::new_unique()));
1463        let unsigned = Transaction::new_unsigned(message);
1464        RuntimeTransaction::from_transaction_for_tests(unsigned)
1465    }
1466
1467    fn transaction_with_readonly_address(
1468        address: Pubkey,
1469    ) -> RuntimeTransaction<SanitizedTransaction> {
1470        transaction_with_readonly_address_with_payer(address, &Pubkey::new_unique())
1471    }
1472
1473    fn transaction_with_readonly_address_with_payer(
1474        address: Pubkey,
1475        payer: &Pubkey,
1476    ) -> RuntimeTransaction<SanitizedTransaction> {
1477        let instruction = Instruction {
1478            program_id: Pubkey::default(),
1479            accounts: vec![AccountMeta::new_readonly(address, false)],
1480            data: vec![],
1481        };
1482        let message = Message::new(&[instruction], Some(payer));
1483        let unsigned = Transaction::new_unsigned(message);
1484        RuntimeTransaction::from_transaction_for_tests(unsigned)
1485    }
1486
1487    fn transaction_with_writable_address(
1488        address: Pubkey,
1489    ) -> RuntimeTransaction<SanitizedTransaction> {
1490        transaction_with_writable_address_with_payer(address, &Pubkey::new_unique())
1491    }
1492
1493    fn transaction_with_writable_address_with_payer(
1494        address: Pubkey,
1495        payer: &Pubkey,
1496    ) -> RuntimeTransaction<SanitizedTransaction> {
1497        let instruction = Instruction {
1498            program_id: Pubkey::default(),
1499            accounts: vec![AccountMeta::new(address, false)],
1500            data: vec![],
1501        };
1502        let message = Message::new(&[instruction], Some(payer));
1503        let unsigned = Transaction::new_unsigned(message);
1504        RuntimeTransaction::from_transaction_for_tests(unsigned)
1505    }
1506
1507    fn create_address_loader(
1508        usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
1509        capability: &Capability,
1510    ) -> impl FnMut(Pubkey) -> UsageQueue + use<'_> {
1511        let usage_queues = usage_queues.unwrap_or_default();
1512        move |address| {
1513            usage_queues
1514                .borrow_mut()
1515                .entry(address)
1516                .or_insert_with(|| UsageQueue::new(capability))
1517                .clone()
1518        }
1519    }
1520
1521    #[test]
1522    fn test_scheduling_state_machine_creation() {
1523        let state_machine = unsafe {
1524            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1525        };
1526        assert_eq!(state_machine.active_task_count(), 0);
1527        assert_eq!(state_machine.total_task_count(), 0);
1528        assert!(state_machine.has_no_active_task());
1529    }
1530
1531    #[test]
1532    fn test_scheduling_state_machine_good_reinitialization() {
1533        let mut state_machine = unsafe {
1534            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1535        };
1536        state_machine.total_task_count.increment_self();
1537        assert_eq!(state_machine.total_task_count(), 1);
1538        state_machine.reinitialize();
1539        assert_eq!(state_machine.total_task_count(), 0);
1540    }
1541
1542    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1543    #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
1544    fn test_scheduling_state_machine_bad_reinitialization(capability: Capability) {
1545        let mut state_machine = unsafe {
1546            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1547        };
1548        let address_loader = &mut create_address_loader(None, &capability);
1549        let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
1550        state_machine.schedule_task(task.clone()).unwrap();
1551        let bad_reinitialize = catch_unwind(AssertUnwindSafe(|| state_machine.reinitialize()));
1552
1553        // Avoid leaks as dutifully detected by Miri; Namely, tasks could be leaked due to
1554        // transient circular references of active tasks by PriorityUsage at stack unwinding, which
1555        // only happens under known panic conditions.
1556        // To avoid that deschedule the task after the panic. Doing this beforehand won't cause the
1557        // panic, which we'd like to test here....
1558        state_machine.deschedule_task(&task);
1559        if let Err(some_panic) = bad_reinitialize {
1560            resume_unwind(some_panic);
1561        }
1562    }
1563
1564    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1565    fn test_create_task(capability: Capability) {
1566        let sanitized = simplest_transaction();
1567        let signature = *sanitized.signature();
1568        let task = SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1569            UsageQueue::new(&capability)
1570        });
1571        assert_eq!(task.task_id(), 3);
1572        assert_eq!(task.transaction().signature(), &signature);
1573    }
1574
1575    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1576    fn test_non_conflicting_task_related_counts(capability: Capability) {
1577        let sanitized = simplest_transaction();
1578        let address_loader = &mut create_address_loader(None, &capability);
1579        let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
1580
1581        let mut state_machine = unsafe {
1582            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1583        };
1584        let task = state_machine.schedule_task(task).unwrap();
1585        assert_eq!(state_machine.active_task_count(), 1);
1586        assert_eq!(state_machine.total_task_count(), 1);
1587        state_machine.deschedule_task(&task);
1588        assert_eq!(state_machine.active_task_count(), 0);
1589        assert_eq!(state_machine.total_task_count(), 1);
1590        assert!(state_machine.has_no_active_task());
1591    }
1592
1593    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1594    fn test_conflicting_task_related_counts(capability: Capability) {
1595        let sanitized = simplest_transaction();
1596        let address_loader = &mut create_address_loader(None, &capability);
1597        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1598        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1599        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1600
1601        let mut state_machine = unsafe {
1602            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1603        };
1604        assert_matches!(
1605            state_machine
1606                .schedule_task(task1.clone())
1607                .map(|t| t.task_id()),
1608            Some(101)
1609        );
1610        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1611
1612        state_machine.deschedule_task(&task1);
1613        assert!(state_machine.has_unblocked_task());
1614        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1615
1616        // unblocked_task_count() should be incremented
1617        assert_eq!(state_machine.unblocked_task_count(), 0);
1618        assert_eq!(
1619            state_machine
1620                .schedule_next_unblocked_task()
1621                .map(|t| t.task_id()),
1622            Some(102)
1623        );
1624        assert_eq!(state_machine.unblocked_task_count(), 1);
1625
1626        // there's no blocked task anymore; calling schedule_next_unblocked_task should be noop and
1627        // shouldn't increment the unblocked_task_count().
1628        assert!(!state_machine.has_unblocked_task());
1629        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1630        assert_eq!(state_machine.unblocked_task_count(), 1);
1631
1632        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1633        state_machine.deschedule_task(&task2);
1634
1635        assert_matches!(
1636            state_machine
1637                .schedule_task(task3.clone())
1638                .map(|task| task.task_id()),
1639            Some(103)
1640        );
1641        state_machine.deschedule_task(&task3);
1642        assert!(state_machine.has_no_active_task());
1643    }
1644
1645    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1646    fn test_existing_blocking_task_then_newly_scheduled_task(capability: Capability) {
1647        let sanitized = simplest_transaction();
1648        let address_loader = &mut create_address_loader(None, &capability);
1649        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1650        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1651        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1652
1653        let mut state_machine = unsafe {
1654            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1655        };
1656        assert_matches!(
1657            state_machine
1658                .schedule_task(task1.clone())
1659                .map(|t| t.task_id()),
1660            Some(101)
1661        );
1662        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1663
1664        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1665        state_machine.deschedule_task(&task1);
1666        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1667
1668        // new task is arriving after task1 is already descheduled and task2 got unblocked
1669        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1670
1671        assert_eq!(state_machine.unblocked_task_count(), 0);
1672        assert_matches!(
1673            state_machine
1674                .schedule_next_unblocked_task()
1675                .map(|t| t.task_id()),
1676            Some(102)
1677        );
1678        assert_eq!(state_machine.unblocked_task_count(), 1);
1679
1680        state_machine.deschedule_task(&task2);
1681
1682        assert_matches!(
1683            state_machine
1684                .schedule_next_unblocked_task()
1685                .map(|t| t.task_id()),
1686            Some(103)
1687        );
1688        assert_eq!(state_machine.unblocked_task_count(), 2);
1689
1690        state_machine.deschedule_task(&task3);
1691        assert!(state_machine.has_no_active_task());
1692    }
1693
1694    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1695    fn test_multiple_readonly_task_and_counts(capability: Capability) {
1696        let conflicting_address = Pubkey::new_unique();
1697        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1698        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1699        let address_loader = &mut create_address_loader(None, &capability);
1700        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1701        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1702
1703        let mut state_machine = unsafe {
1704            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1705        };
1706        // both of read-only tasks should be immediately runnable
1707        assert_matches!(
1708            state_machine
1709                .schedule_task(task1.clone())
1710                .map(|t| t.task_id()),
1711            Some(101)
1712        );
1713        assert_matches!(
1714            state_machine
1715                .schedule_task(task2.clone())
1716                .map(|t| t.task_id()),
1717            Some(102)
1718        );
1719
1720        assert_eq!(state_machine.active_task_count(), 2);
1721        assert_eq!(state_machine.handled_task_count(), 0);
1722        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1723        state_machine.deschedule_task(&task1);
1724        assert_eq!(state_machine.active_task_count(), 1);
1725        assert_eq!(state_machine.handled_task_count(), 1);
1726        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1727        state_machine.deschedule_task(&task2);
1728        assert_eq!(state_machine.active_task_count(), 0);
1729        assert_eq!(state_machine.handled_task_count(), 2);
1730        assert!(state_machine.has_no_active_task());
1731    }
1732
1733    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1734    fn test_all_blocking_readable_tasks_block_writable_task(capability: Capability) {
1735        let conflicting_address = Pubkey::new_unique();
1736        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1737        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1738        let sanitized3 = transaction_with_writable_address(conflicting_address);
1739        let address_loader = &mut create_address_loader(None, &capability);
1740        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1741        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1742        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1743
1744        let mut state_machine = unsafe {
1745            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1746        };
1747        assert_matches!(
1748            state_machine
1749                .schedule_task(task1.clone())
1750                .map(|t| t.task_id()),
1751            Some(101)
1752        );
1753        assert_matches!(
1754            state_machine
1755                .schedule_task(task2.clone())
1756                .map(|t| t.task_id()),
1757            Some(102)
1758        );
1759        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1760
1761        assert_eq!(state_machine.active_task_count(), 3);
1762        assert_eq!(state_machine.handled_task_count(), 0);
1763        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1764        state_machine.deschedule_task(&task1);
1765        assert_eq!(state_machine.active_task_count(), 2);
1766        assert_eq!(state_machine.handled_task_count(), 1);
1767        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1768        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1769        state_machine.deschedule_task(&task2);
1770        assert_eq!(state_machine.active_task_count(), 1);
1771        assert_eq!(state_machine.handled_task_count(), 2);
1772        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1773        // task3 is finally unblocked after all of readable tasks (task1 and task2) is finished.
1774        assert_matches!(
1775            state_machine
1776                .schedule_next_unblocked_task()
1777                .map(|t| t.task_id()),
1778            Some(103)
1779        );
1780        state_machine.deschedule_task(&task3);
1781        assert!(state_machine.has_no_active_task());
1782    }
1783
1784    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1785    fn test_readonly_then_writable_then_readonly_linearized(capability: Capability) {
1786        let conflicting_address = Pubkey::new_unique();
1787        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1788        let sanitized2 = transaction_with_writable_address(conflicting_address);
1789        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1790        let address_loader = &mut create_address_loader(None, &capability);
1791        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1792        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1793        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1794
1795        let mut state_machine = unsafe {
1796            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1797        };
1798        assert_matches!(
1799            state_machine
1800                .schedule_task(task1.clone())
1801                .map(|t| t.task_id()),
1802            Some(101)
1803        );
1804        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1805        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1806
1807        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1808        state_machine.deschedule_task(&task1);
1809        assert_matches!(
1810            state_machine
1811                .schedule_next_unblocked_task()
1812                .map(|t| t.task_id()),
1813            Some(102)
1814        );
1815        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1816        state_machine.deschedule_task(&task2);
1817        assert_matches!(
1818            state_machine
1819                .schedule_next_unblocked_task()
1820                .map(|t| t.task_id()),
1821            Some(103)
1822        );
1823        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1824        state_machine.deschedule_task(&task3);
1825        assert!(state_machine.has_no_active_task());
1826    }
1827
1828    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1829    fn test_readonly_then_writable(capability: Capability) {
1830        let conflicting_address = Pubkey::new_unique();
1831        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1832        let sanitized2 = transaction_with_writable_address(conflicting_address);
1833        let address_loader = &mut create_address_loader(None, &capability);
1834        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1835        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1836
1837        let mut state_machine = unsafe {
1838            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1839        };
1840        assert_matches!(
1841            state_machine
1842                .schedule_task(task1.clone())
1843                .map(|t| t.task_id()),
1844            Some(101)
1845        );
1846        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1847
1848        // descheduling read-locking task1 should equate to unblocking write-locking task2
1849        state_machine.deschedule_task(&task1);
1850        assert_matches!(
1851            state_machine
1852                .schedule_next_unblocked_task()
1853                .map(|t| t.task_id()),
1854            Some(102)
1855        );
1856        state_machine.deschedule_task(&task2);
1857        assert!(state_machine.has_no_active_task());
1858    }
1859
1860    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1861    fn test_blocked_tasks_writable_2_readonly_then_writable(capability: Capability) {
1862        let conflicting_address = Pubkey::new_unique();
1863        let sanitized1 = transaction_with_writable_address(conflicting_address);
1864        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1865        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1866        let sanitized4 = transaction_with_writable_address(conflicting_address);
1867        let address_loader = &mut create_address_loader(None, &capability);
1868        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1869        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1870        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1871        let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1872
1873        let mut state_machine = unsafe {
1874            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1875        };
1876        assert_matches!(
1877            state_machine
1878                .schedule_task(task1.clone())
1879                .map(|t| t.task_id()),
1880            Some(101)
1881        );
1882        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1883        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1884        assert_matches!(state_machine.schedule_task(task4.clone()), None);
1885
1886        state_machine.deschedule_task(&task1);
1887        assert_matches!(
1888            state_machine
1889                .schedule_next_unblocked_task()
1890                .map(|t| t.task_id()),
1891            Some(102)
1892        );
1893        assert_matches!(
1894            state_machine
1895                .schedule_next_unblocked_task()
1896                .map(|t| t.task_id()),
1897            Some(103)
1898        );
1899        // the above deschedule_task(task1) call should only unblock task2 and task3 because these
1900        // are read-locking. And shouldn't unblock task4 because it's write-locking
1901        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1902
1903        state_machine.deschedule_task(&task2);
1904        // still task4 is blocked...
1905        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1906
1907        state_machine.deschedule_task(&task3);
1908        // finally task4 should be unblocked
1909        assert_matches!(
1910            state_machine
1911                .schedule_next_unblocked_task()
1912                .map(|t| t.task_id()),
1913            Some(104)
1914        );
1915        state_machine.deschedule_task(&task4);
1916        assert!(state_machine.has_no_active_task());
1917    }
1918
1919    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1920    fn test_gradual_locking(capability: Capability) {
1921        let conflicting_address = Pubkey::new_unique();
1922        let sanitized1 = transaction_with_writable_address(conflicting_address);
1923        let sanitized2 = transaction_with_writable_address(conflicting_address);
1924        let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1925        let address_loader = &mut create_address_loader(Some(usage_queues.clone()), &capability);
1926        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1927        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1928
1929        let mut state_machine = unsafe {
1930            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1931        };
1932        assert_matches!(
1933            state_machine
1934                .schedule_task(task1.clone())
1935                .map(|t| t.task_id()),
1936            Some(101)
1937        );
1938        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1939        let usage_queues = usage_queues.borrow_mut();
1940        let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1941        usage_queue
1942            .0
1943            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1944                assert_matches!(usage_queue.current_usage(), Some(RequestedUsage::Writable));
1945            });
1946        // task2's fee payer should have been locked already even if task2 is blocked still via the
1947        // above the schedule_task(task2) call
1948        let fee_payer = task2.transaction().message().fee_payer();
1949        let usage_queue = usage_queues.get(fee_payer).unwrap();
1950        usage_queue
1951            .0
1952            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1953                assert_matches!(usage_queue.current_usage(), Some(RequestedUsage::Writable));
1954            });
1955        state_machine.deschedule_task(&task1);
1956        assert_matches!(
1957            state_machine
1958                .schedule_next_unblocked_task()
1959                .map(|t| t.task_id()),
1960            Some(102)
1961        );
1962        state_machine.deschedule_task(&task2);
1963        assert!(state_machine.has_no_active_task());
1964    }
1965
1966    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1967    #[should_panic(expected = "internal error: entered unreachable code")]
1968    fn test_unreachable_unlock_conditions1(capability: Capability) {
1969        let mut state_machine = unsafe {
1970            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1971        };
1972        let usage_queue = UsageQueue::new(&capability);
1973        let sanitized = simplest_transaction();
1974        let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1975            UsageQueue::new(&capability)
1976        });
1977        usage_queue
1978            .0
1979            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1980                let _ = usage_queue.unlock(task, RequestedUsage::Writable);
1981            });
1982    }
1983
1984    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1985    #[should_panic(
1986        expected = "assertion failed: `Readonly` does not match `RequestedUsage::Writable`"
1987    )]
1988    fn test_unreachable_unlock_conditions2(capability: Capability) {
1989        let mut state_machine = unsafe {
1990            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1991        };
1992        let usage_queue = UsageQueue::new(&capability);
1993        let sanitized = simplest_transaction();
1994        let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1995            UsageQueue::new(&capability)
1996        });
1997        usage_queue
1998            .0
1999            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
2000                usage_queue.update_current_usage(RequestedUsage::Writable, task);
2001                let _ = usage_queue.unlock(task, RequestedUsage::Readonly);
2002            });
2003    }
2004
2005    #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
2006    #[should_panic(expected = "internal error: entered unreachable code")]
2007    fn test_unreachable_unlock_conditions3(capability: Capability) {
2008        let mut state_machine = unsafe {
2009            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
2010        };
2011        let usage_queue = UsageQueue::new(&capability);
2012        let sanitized = simplest_transaction();
2013        let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
2014            UsageQueue::new(&capability)
2015        });
2016        usage_queue
2017            .0
2018            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
2019                usage_queue.update_current_usage(RequestedUsage::Readonly, task);
2020                let _ = usage_queue.unlock(task, RequestedUsage::Writable);
2021            });
2022    }
2023
2024    mod reblocking {
2025        use super::{RequestedUsage::*, *};
2026
2027        #[track_caller]
2028        fn assert_task_index(actual: Option<Task>, expected: Option<OrderedTaskId>) {
2029            assert_eq!(actual.map(|task| task.task_id()), expected);
2030        }
2031
2032        macro_rules! assert_task_index {
2033            ($left:expr, $right:expr) => {
2034                assert_task_index($left, $right);
2035            };
2036        }
2037
2038        fn setup() -> (
2039            SchedulingStateMachine,
2040            impl FnMut((RequestedUsage, Pubkey), OrderedTaskId) -> Task,
2041            Task,
2042        ) {
2043            let mut state_machine = unsafe {
2044                SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
2045            };
2046
2047            let payer = Pubkey::new_unique();
2048            let mut address_loader = create_address_loader(None, &Capability::PriorityQueueing);
2049
2050            let mut create_task = move |(requested_usage, address), task_id| match requested_usage {
2051                RequestedUsage::Readonly => SchedulingStateMachine::create_task(
2052                    transaction_with_readonly_address_with_payer(address, &payer),
2053                    task_id,
2054                    &mut address_loader,
2055                ),
2056                RequestedUsage::Writable => SchedulingStateMachine::create_task(
2057                    transaction_with_writable_address_with_payer(address, &payer),
2058                    task_id,
2059                    &mut address_loader,
2060                ),
2061            };
2062
2063            let t0_block_others = create_task((Writable, Pubkey::new_unique()), 100);
2064            assert_task_index!(
2065                state_machine.schedule_task(t0_block_others.clone()),
2066                Some(100)
2067            );
2068
2069            (state_machine, create_task, t0_block_others)
2070        }
2071
2072        #[test]
2073        fn test_reblocked_tasks_lower_write_then_higher_write() {
2074            let (mut s, mut create_task, t0_block_others) = setup();
2075
2076            let reblocked_address = Pubkey::new_unique();
2077            let t1_reblocked = create_task((Writable, reblocked_address), 102);
2078            let t2_force_locked = create_task((Writable, reblocked_address), 10);
2079
2080            assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2081            assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2082
2083            s.deschedule_task(&t0_block_others);
2084            assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2085            s.deschedule_task(&t2_force_locked);
2086            assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2087            s.deschedule_task(&t1_reblocked);
2088            assert!(s.has_no_active_task());
2089        }
2090
2091        #[test]
2092        fn test_reblocked_tasks_lower_write_then_higher_read() {
2093            let (mut s, mut create_task, t0_block_others) = setup();
2094
2095            let reblocked_address = Pubkey::new_unique();
2096            let t1_reblocked = create_task((Writable, reblocked_address), 102);
2097            let t2_force_locked = create_task((Readonly, reblocked_address), 10);
2098
2099            assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2100            assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2101
2102            s.deschedule_task(&t0_block_others);
2103            assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2104            s.deschedule_task(&t2_force_locked);
2105            assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2106            s.deschedule_task(&t1_reblocked);
2107            assert!(s.has_no_active_task());
2108        }
2109
2110        #[test]
2111        fn test_reblocked_tasks_lower_read_then_higher_read() {
2112            let (mut s, mut create_task, t0_block_others) = setup();
2113
2114            let reblocked_address = Pubkey::new_unique();
2115            let t1_not_reblocked = create_task((Readonly, reblocked_address), 102);
2116            let t2_skipped = create_task((Writable, reblocked_address), 103);
2117            let t3_force_locked = create_task((Readonly, reblocked_address), 10);
2118
2119            assert_task_index!(s.schedule_task(t1_not_reblocked.clone()), None);
2120            assert_task_index!(s.schedule_task(t2_skipped.clone()), None);
2121            assert_task_index!(s.schedule_task(t3_force_locked.clone()), None);
2122
2123            s.deschedule_task(&t0_block_others);
2124            assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2125            s.deschedule_task(&t3_force_locked);
2126            assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2127            s.deschedule_task(&t1_not_reblocked);
2128            assert_task_index!(s.schedule_next_unblocked_task(), Some(103));
2129            s.deschedule_task(&t2_skipped);
2130            assert!(s.has_no_active_task());
2131        }
2132
2133        #[test]
2134        fn test_reblocked_tasks_lower_read_then_higher_write_full() {
2135            let (mut s, mut create_task, t0_block_others) = setup();
2136
2137            let reblocked_address = Pubkey::new_unique();
2138            let t1_reblocked = create_task((Readonly, reblocked_address), 102);
2139            let t2_force_locked = create_task((Writable, reblocked_address), 10);
2140
2141            assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2142            assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2143
2144            s.deschedule_task(&t0_block_others);
2145            assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2146            s.deschedule_task(&t2_force_locked);
2147            assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2148            s.deschedule_task(&t1_reblocked);
2149            assert!(s.has_no_active_task());
2150        }
2151    }
2152}