solana_unified_scheduler_logic/
lib.rs

1#![allow(rustdoc::private_intra_doc_links)]
2//! The task (transaction) scheduling code for the unified scheduler
3//!
4//! ### High-level API and design
5//!
6//! The most important type is [`SchedulingStateMachine`]. It takes new tasks (= transactions) and
7//! may return back them if runnable via
8//! [`::schedule_task()`](SchedulingStateMachine::schedule_task) while maintaining the account
9//! readonly/writable lock rules. Those returned runnable tasks are guaranteed to be safe to
10//! execute in parallel. Lastly, `SchedulingStateMachine` should be notified about the completion
11//! of the execution via [`::deschedule_task()`](SchedulingStateMachine::deschedule_task), so that
12//! conflicting tasks can be returned from
13//! [`::schedule_next_unblocked_task()`](SchedulingStateMachine::schedule_next_unblocked_task) as
14//! newly-unblocked runnable ones.
15//!
16//! The design principle of this crate (`solana-unified-scheduler-logic`) is simplicity for the
17//! separation of concern. It is interacted only with a few of its public API by
18//! `solana-unified-scheduler-pool`. This crate doesn't know about banks, slots, solana-runtime,
19//! threads, crossbeam-channel at all. Because of this, it's deterministic, easy-to-unit-test, and
20//! its perf footprint is well understood. It really focuses on its single job: sorting
21//! transactions in executable order.
22//!
23//! ### Algorithm
24//!
25//! The algorithm can be said it's based on per-address FIFO queues, which are updated every time
26//! both new task is coming (= called _scheduling_) and runnable (= _post-scheduling_) task is
27//! finished (= called _descheduling_).
28//!
29//! For the _non-conflicting scheduling_ case, the story is very simple; it just remembers that all
30//! of accessed addresses are write-locked or read-locked with the number of active (=
31//! _currently-scheduled-and-not-descheduled-yet_) tasks. Correspondingly, descheduling does the
32//! opposite book-keeping process, regardless whether a finished task has been conflicted or not.
33//!
34//! For the _conflicting scheduling_ case, it remembers that each of **non-conflicting addresses**
35//! like the non-conflicting case above. As for **conflicting addresses**, each task is recorded to
36//! respective FIFO queues attached to the (conflicting) addresses. Importantly, the number of
37//! conflicting addresses of the conflicting task is also remembered.
38//!
39//! The last missing piece is that the scheduler actually tries to reschedule previously blocked
40//! tasks while deschduling, in addition to the above-mentioned book-keeping processing. Namely,
41//! when given address is ready for new fresh locking resulted from descheduling a task (i.e. write
42//! lock is released or read lock count has reached zero), it pops out the first element of the
43//! FIFO blocked-task queue of the address. Then, it immediately marks the address as relocked. It
44//! also decrements the number of conflicting addresses of the popped-out task. As the final step,
45//! if the number reaches to the zero, it means the task has fully finished locking all of its
46//! addresses and is directly routed to be runnable. Lastly, if the next first element of the
47//! blocked-task queue is trying to read-lock the address like the popped-out one, this
48//! rescheduling is repeated as an optimization to increase parallelism of task execution.
49//!
50//! Put differently, this algorithm tries to gradually lock all of addresses of tasks at different
51//! timings while not deviating the execution order from the original task ingestion order. This
52//! implies there's no locking retries in general, which is the primary source of non-linear perf.
53//! degradation.
54//!
55//! As a ballpark number from a synthesized micro benchmark on usual CPU for `mainnet-beta`
56//! validators, it takes roughly 100ns to schedule and deschedule a transaction with 10 accounts.
57//! And 1us for a transaction with 100 accounts. Note that this excludes crossbeam communication
58//! overhead at all. That's said, it's not unrealistic to say the whole unified scheduler can
59//! attain 100k-1m tps overall, assuming those transaction executions aren't bottlenecked.
60//!
61//! ### Runtime performance characteristics and data structure arrangement
62//!
63//! Its algorithm is very fast for high throughput, real-time for low latency. The whole
64//! unified-scheduler architecture is designed from grounds up to support the fastest execution of
65//! this scheduling code. For that end, unified scheduler pre-loads address-specific locking state
66//! data structures (called [`UsageQueue`]) for all of transaction's accounts, in order to offload
67//! the job to other threads from the scheduler thread. This preloading is done inside
68//! [`create_task()`](SchedulingStateMachine::create_task). In this way, task scheduling
69//! computational complexity is basically reduced to several word-sized loads and stores in the
70//! scheduler thread (i.e.  constant; no allocations nor syscalls), while being proportional to the
71//! number of addresses in a given transaction. Note that this statement is held true, regardless
72//! of conflicts. This is because the preloading also pre-allocates some scratch-pad area
73//! ([`blocked_usages_from_tasks`](UsageQueueInner::blocked_usages_from_tasks)) to stash blocked
74//! ones. So, a conflict only incurs some additional fixed number of mem stores, within error
75//! margin of the constant complexity. And additional memory allocation for the scratchpad could
76//! said to be amortized, if such an unusual event should occur.
77//!
78//! [`Arc`] is used to implement this preloading mechanism, because `UsageQueue`s are shared across
79//! tasks accessing the same account, and among threads due to the preloading. Also, interior
80//! mutability is needed. However, `SchedulingStateMachine` doesn't use conventional locks like
81//! RwLock.  Leveraging the fact it's the only state-mutating exclusive thread, it instead uses
82//! `UnsafeCell`, which is sugar-coated by a tailored wrapper called [`TokenCell`]. `TokenCell`
83//! imposes an overly restrictive aliasing rule via rust type system to maintain the memory safety.
84//! By localizing any synchronization to the message passing, the scheduling code itself attains
85//! maximally possible single-threaed execution without stalling cpu pipelines at all, only
86//! constrained to mem access latency, while efficiently utilizing L1-L3 cpu cache with full of
87//! `UsageQueue`s.
88//!
89//! ### Buffer bloat insignificance
90//!
91//! The scheduler code itself doesn't care about the buffer bloat problem, which can occur in
92//! unified scheduler, where a run of heavily linearized and blocked tasks could be severely
93//! hampered by very large number of interleaved runnable tasks along side.  The reason is again
94//! for separation of concerns. This is acceptable because the scheduling code itself isn't
95//! susceptible to the buffer bloat problem by itself as explained by the description and validated
96//! by the mentioned benchmark above. Thus, this should be solved elsewhere, specifically at the
97//! scheduler pool.
98use {
99    crate::utils::{ShortCounter, Token, TokenCell},
100    assert_matches::assert_matches,
101    solana_pubkey::Pubkey,
102    solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
103    solana_transaction::sanitized::SanitizedTransaction,
104    static_assertions::const_assert_eq,
105    std::{collections::VecDeque, mem, sync::Arc},
106    unwrap_none::UnwrapNone,
107};
108
109#[derive(Clone, Copy, Debug, PartialEq)]
110pub enum SchedulingMode {
111    BlockVerification,
112    BlockProduction,
113}
114
115/// This type alias is intentionally not exposed to public API with `pub`. The choice of explicit
116/// `u32`, rather than more neutral `usize`, is an implementation detail to squeeze out CPU-cache
117/// footprint as much as possible.
118/// Note that usage of `u32` is safe because it's expected `SchedulingStateMachine` to be
119/// `reinitialize()`-d rather quickly after short period: 1 slot for block verification, 4 (or up to
120/// 8) consecutive slots for block production.
121type CounterInner = u32;
122
123/// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`].
124mod utils {
125    use {
126        crate::CounterInner,
127        std::{
128            any::{self, TypeId},
129            cell::{RefCell, UnsafeCell},
130            collections::BTreeSet,
131            marker::PhantomData,
132            thread,
133        },
134    };
135
136    /// A really tiny counter to hide `.checked_{add,sub}` all over the place.
137    ///
138    /// It's caller's responsibility to ensure this (backed by [`CounterInner`]) never overflow.
139    #[derive(Debug, Clone, Copy)]
140    pub(super) struct ShortCounter(CounterInner);
141
142    impl ShortCounter {
143        pub(super) fn zero() -> Self {
144            Self(0)
145        }
146
147        pub(super) fn one() -> Self {
148            Self(1)
149        }
150
151        pub(super) fn is_one(&self) -> bool {
152            self.0 == 1
153        }
154
155        pub(super) fn is_zero(&self) -> bool {
156            self.0 == 0
157        }
158
159        pub(super) fn current(&self) -> CounterInner {
160            self.0
161        }
162
163        #[must_use]
164        pub(super) fn increment(self) -> Self {
165            Self(self.0.checked_add(1).unwrap())
166        }
167
168        #[must_use]
169        pub(super) fn decrement(self) -> Self {
170            Self(self.0.checked_sub(1).unwrap())
171        }
172
173        pub(super) fn increment_self(&mut self) -> &mut Self {
174            *self = self.increment();
175            self
176        }
177
178        pub(super) fn decrement_self(&mut self) -> &mut Self {
179            *self = self.decrement();
180            self
181        }
182
183        pub(super) fn reset_to_zero(&mut self) -> &mut Self {
184            self.0 = 0;
185            self
186        }
187    }
188
189    /// A conditionally [`Send`]-able and [`Sync`]-able cell leveraging scheduler's one-by-one data
190    /// access pattern with zero runtime synchronization cost.
191    ///
192    /// To comply with Rust's aliasing rules, these cells require a carefully-created [`Token`] to
193    /// be passed around to access the inner values. The token is a special-purpose phantom object
194    /// to get rid of its inherent `unsafe`-ness in [`UnsafeCell`], which is internally used for
195    /// the interior mutability.
196    ///
197    /// The final objective of [`Token`] is to ensure there's only one mutable reference to the
198    /// [`TokenCell`] at most _at any given moment_. To that end, it's `unsafe` to create it,
199    /// shifting the responsibility of binding the only singleton instance to a particular thread
200    /// and not creating more than one, onto the API consumers. And its constructor is non-`const`,
201    /// and the type is `!Clone` (and `!Copy` as well), `!Default`, `!Send` and `!Sync` to make it
202    /// relatively hard to cross thread boundaries accidentally.
203    ///
204    /// In other words, the token semantically _owns_ all of its associated instances of
205    /// [`TokenCell`]s. And `&mut Token` is needed to access one of them as if the one is of
206    /// [`Token`]'s `*_mut()` getters. Thus, the Rust aliasing rule for `UnsafeCell` can
207    /// transitively be proven to be satisfied simply based on the usual borrow checking of the
208    /// `&mut` reference of [`Token`] itself via
209    /// [`::with_borrow_mut()`](TokenCell::with_borrow_mut).
210    ///
211    /// By extension, it's allowed to create _multiple_ tokens in a _single_ process as long as no
212    /// instance of [`TokenCell`] is shared by multiple instances of [`Token`].
213    ///
214    /// Note that this is overly restrictive in that it's forbidden, yet, technically possible
215    /// to _have multiple mutable references to the inner values at the same time, if and only
216    /// if the respective cells aren't aliased to each other (i.e. different instances)_. This
217    /// artificial restriction is acceptable for its intended use by the unified scheduler's code
218    /// because its algorithm only needs to access each instance of [`TokenCell`]-ed data once at a
219    /// time. Finally, this restriction is traded off for restoration of Rust aliasing rule at zero
220    /// runtime cost.  Without this token mechanism, there's no way to realize this.
221    #[derive(Debug, Default)]
222    pub(super) struct TokenCell<V>(UnsafeCell<V>);
223
224    impl<V> TokenCell<V> {
225        /// Creates a new `TokenCell` with the `value` typed as `V`.
226        ///
227        /// Note that this isn't parametric over the its accompanied `Token`'s lifetime to avoid
228        /// complex handling of non-`'static` heaped data in general. Instead, it's manually
229        /// required to ensure this instance is accessed only via its associated Token for the
230        /// entire lifetime.
231        ///
232        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
233        /// variables among threads.
234        pub(super) fn new(value: V) -> Self {
235            Self(UnsafeCell::new(value))
236        }
237
238        /// Acquires a mutable reference inside a given closure, while borrowing the mutable
239        /// reference of the given token.
240        ///
241        /// In this way, any additional reborrow can never happen at the same time across all
242        /// instances of [`TokenCell<V>`] conceptually owned by the instance of [`Token<V>`] (a
243        /// particular thread), unless previous borrow is released. After the release, the used
244        /// singleton token should be free to be reused for reborrows.
245        ///
246        /// Note that lifetime of the acquired reference is still restricted to 'self, not
247        /// 'token, in order to avoid use-after-free undefined behaviors.
248        pub(super) fn with_borrow_mut<R>(
249            &self,
250            _token: &mut Token<V>,
251            f: impl FnOnce(&mut V) -> R,
252        ) -> R {
253            f(unsafe { &mut *self.0.get() })
254        }
255    }
256
257    // Safety: Once after a (`Send`-able) `TokenCell` is transferred to a thread from other
258    // threads, access to `TokenCell` is assumed to be only from the single thread by proper use of
259    // Token. Thereby, implementing `Sync` can be thought as safe and doing so is needed for the
260    // particular implementation pattern in the unified scheduler (multi-threaded off-loading).
261    //
262    // In other words, TokenCell is technically still `!Sync`. But there should be no
263    // legalized usage which depends on real `Sync` to avoid undefined behaviors.
264    unsafe impl<V> Sync for TokenCell<V> {}
265
266    /// A auxiliary zero-sized type to enforce aliasing rule to [`TokenCell`] via rust type system
267    ///
268    /// Token semantically owns a collection of `TokenCell` objects and governs the _unique_
269    /// existence of mutable access over them by requiring the token itself to be mutably borrowed
270    /// to get a mutable reference to the internal value of `TokenCell`.
271    // *mut is used to make this type !Send and !Sync
272    pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
273
274    impl<V> Token<V> {
275        /// Returns the token to acquire a mutable reference to the inner value of [TokenCell].
276        ///
277        /// This is intentionally left to be non-`const` to forbid unprotected sharing via static
278        /// variables among threads.
279        ///
280        /// # Panics
281        ///
282        /// This function will `panic!()` if called multiple times with same type `V` from the same
283        /// thread to detect potential misuses.
284        ///
285        /// # Safety
286        ///
287        /// This method should be called exactly once for each thread at most to avoid undefined
288        /// behavior when used with [`Token`].
289        #[must_use]
290        pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
291            thread_local! {
292                static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
293            }
294            // TOKEN.with_borrow_mut can't panic because it's the only non-overlapping
295            // bound-to-local-variable borrow of the _thread local_ variable.
296            assert!(
297                TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
298                "{:?} is wrongly initialized twice on {:?}",
299                any::type_name::<Self>(),
300                thread::current()
301            );
302
303            Self(PhantomData)
304        }
305    }
306
307    #[cfg(test)]
308    mod tests {
309        use {
310            super::{Token, TokenCell},
311            std::{mem, sync::Arc, thread},
312        };
313
314        #[test]
315        #[should_panic(
316            expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
317                        initialized twice on Thread"
318        )]
319        fn test_second_creation_of_tokens_in_a_thread() {
320            unsafe {
321                let _ = Token::<usize>::assume_exclusive_mutating_thread();
322                let _ = Token::<usize>::assume_exclusive_mutating_thread();
323            }
324        }
325
326        #[derive(Debug)]
327        struct FakeQueue {
328            v: Vec<u8>,
329        }
330
331        // As documented above, it's illegal to create multiple tokens inside a single thread to
332        // acquire multiple mutable references to the same TokenCell at the same time.
333        #[test]
334        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
335        // confirming false-positive result to conversely show the merit of miri!
336        #[cfg_attr(miri, ignore)]
337        fn test_ub_illegally_created_multiple_tokens() {
338            // Unauthorized token minting!
339            let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
340            let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
341
342            let queue = TokenCell::new(FakeQueue {
343                v: Vec::with_capacity(20),
344            });
345            queue.with_borrow_mut(&mut token1, |queue_mut1| {
346                queue_mut1.v.push(1);
347                queue.with_borrow_mut(&mut token2, |queue_mut2| {
348                    queue_mut2.v.push(2);
349                    queue_mut1.v.push(3);
350                });
351                queue_mut1.v.push(4);
352            });
353
354            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
355            #[cfg(not(miri))]
356            dbg!(queue.0.into_inner());
357
358            // Return successfully to indicate an unexpected outcome, because this test should
359            // have aborted by now.
360        }
361
362        // As documented above, it's illegal to share (= co-own) the same instance of TokenCell
363        // across threads. Unfortunately, we can't prevent this from happening with some
364        // type-safety magic to cause compile errors... So sanity-check here test fails due to a
365        // runtime error of the known UB, when run under miri.
366        #[test]
367        // Trigger (harmless) UB unless running under miri by conditionally #[ignore]-ing,
368        // confirming false-positive result to conversely show the merit of miri!
369        #[cfg_attr(miri, ignore)]
370        fn test_ub_illegally_shared_token_cell() {
371            let queue1 = Arc::new(TokenCell::new(FakeQueue {
372                v: Vec::with_capacity(20),
373            }));
374            let queue2 = queue1.clone();
375            #[cfg(not(miri))]
376            let queue3 = queue1.clone();
377
378            // Usually miri immediately detects the data race; but just repeat enough time to avoid
379            // being flaky
380            for _ in 0..10 {
381                let (queue1, queue2) = (queue1.clone(), queue2.clone());
382                let thread1 = thread::spawn(move || {
383                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
384                    queue1.with_borrow_mut(&mut token, |queue| {
385                        // this is UB
386                        queue.v.push(3);
387                    });
388                });
389                // Immediately spawn next thread without joining thread1 to ensure there's a data race
390                // definitely. Otherwise, joining here wouldn't cause UB.
391                let thread2 = thread::spawn(move || {
392                    let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
393                    queue2.with_borrow_mut(&mut token, |queue| {
394                        // this is UB
395                        queue.v.push(4);
396                    });
397                });
398
399                thread1.join().unwrap();
400                thread2.join().unwrap();
401            }
402
403            // It's in ub already, so we can't assert reliably, so dbg!(...) just for fun
404            #[cfg(not(miri))]
405            {
406                drop((queue1, queue2));
407                dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
408            }
409
410            // Return successfully to indicate an unexpected outcome, because this test should
411            // have aborted by now
412        }
413    }
414}
415
416/// [`Result`] for locking a [usage_queue](UsageQueue) with particular
417/// [current_usage](RequestedUsage).
418type LockResult = Result<(), ()>;
419const_assert_eq!(mem::size_of::<LockResult>(), 1);
420
421/// Something to be scheduled; usually a wrapper of [`SanitizedTransaction`].
422pub type Task = Arc<TaskInner>;
423const_assert_eq!(mem::size_of::<Task>(), 8);
424
425pub type BlockSize = usize;
426pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
427
428/// [`Token`] for [`UsageQueue`].
429type UsageQueueToken = Token<UsageQueueInner>;
430const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
431
432/// [`Token`] for [task](Task)'s [internal mutable data](`TaskInner::blocked_usage_count`).
433type BlockedUsageCountToken = Token<ShortCounter>;
434const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
435
436/// Internal scheduling data about a particular task.
437#[derive(Debug)]
438pub struct TaskInner {
439    transaction: RuntimeTransaction<SanitizedTransaction>,
440    /// The index of a transaction in ledger entries; not used by SchedulingStateMachine by itself.
441    /// Carrying this along with the transaction is needed to properly record the execution result
442    /// of it.
443    index: usize,
444    lock_contexts: Vec<LockContext>,
445    blocked_usage_count: TokenCell<ShortCounter>,
446    consumed_block_size: BlockSize,
447}
448
449impl TaskInner {
450    pub fn task_index(&self) -> usize {
451        self.index
452    }
453
454    pub fn consumed_block_size(&self) -> BlockSize {
455        self.consumed_block_size
456    }
457
458    pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
459        &self.transaction
460    }
461
462    fn lock_contexts(&self) -> &[LockContext] {
463        &self.lock_contexts
464    }
465
466    fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
467        self.blocked_usage_count
468            .with_borrow_mut(token, |usage_count| {
469                *usage_count = count;
470            })
471    }
472
473    #[must_use]
474    fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
475        let did_unblock = self
476            .blocked_usage_count
477            .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
478        did_unblock.then_some(self)
479    }
480
481    pub fn into_transaction(self: Task) -> RuntimeTransaction<SanitizedTransaction> {
482        Task::into_inner(self).unwrap().transaction
483    }
484}
485
486/// [`Task`]'s per-address context to lock a [usage_queue](UsageQueue) with [certain kind of
487/// request](RequestedUsage).
488#[derive(Debug)]
489struct LockContext {
490    usage_queue: UsageQueue,
491    requested_usage: RequestedUsage,
492}
493const_assert_eq!(mem::size_of::<LockContext>(), 16);
494
495impl LockContext {
496    fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
497        Self {
498            usage_queue,
499            requested_usage,
500        }
501    }
502
503    fn with_usage_queue_mut<R>(
504        &self,
505        usage_queue_token: &mut UsageQueueToken,
506        f: impl FnOnce(&mut UsageQueueInner) -> R,
507    ) -> R {
508        self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
509    }
510}
511
512/// Status about how the [`UsageQueue`] is used currently.
513#[derive(Copy, Clone, Debug)]
514enum Usage {
515    Readonly(ShortCounter),
516    Writable,
517}
518const_assert_eq!(mem::size_of::<Usage>(), 8);
519
520impl From<RequestedUsage> for Usage {
521    fn from(requested_usage: RequestedUsage) -> Self {
522        match requested_usage {
523            RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
524            RequestedUsage::Writable => Usage::Writable,
525        }
526    }
527}
528
529/// Status about how a task is requesting to use a particular [`UsageQueue`].
530#[derive(Clone, Copy, Debug)]
531enum RequestedUsage {
532    Readonly,
533    Writable,
534}
535
536/// Internal scheduling data about a particular address.
537///
538/// Specifically, it holds the current [`Usage`] (or no usage with [`Usage::Unused`]) and which
539/// [`Task`]s are blocked to be executed after the current task is notified to be finished via
540/// [`::deschedule_task`](`SchedulingStateMachine::deschedule_task`)
541#[derive(Debug)]
542struct UsageQueueInner {
543    current_usage: Option<Usage>,
544    blocked_usages_from_tasks: VecDeque<UsageFromTask>,
545}
546
547type UsageFromTask = (RequestedUsage, Task);
548
549impl Default for UsageQueueInner {
550    fn default() -> Self {
551        Self {
552            current_usage: None,
553            // Capacity should be configurable to create with large capacity like 1024 inside the
554            // (multi-threaded) closures passed to create_task(). In this way, reallocs can be
555            // avoided happening in the scheduler thread. Also, this configurability is desired for
556            // unified-scheduler-logic's motto: separation of concerns (the pure logic should be
557            // sufficiently distanced from any some random knob's constants needed for messy
558            // reality for author's personal preference...).
559            //
560            // Note that large cap should be accompanied with proper scheduler cleaning after use,
561            // which should be handled by higher layers (i.e. scheduler pool).
562            blocked_usages_from_tasks: VecDeque::with_capacity(128),
563        }
564    }
565}
566
567impl UsageQueueInner {
568    fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
569        match self.current_usage {
570            None => Some(Usage::from(requested_usage)),
571            Some(Usage::Readonly(count)) => match requested_usage {
572                RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
573                RequestedUsage::Writable => None,
574            },
575            Some(Usage::Writable) => None,
576        }
577        .inspect(|&new_usage| {
578            self.current_usage = Some(new_usage);
579        })
580        .map(|_| ())
581        .ok_or(())
582    }
583
584    #[must_use]
585    fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
586        let mut is_unused_now = false;
587        match &mut self.current_usage {
588            Some(Usage::Readonly(ref mut count)) => match requested_usage {
589                RequestedUsage::Readonly => {
590                    if count.is_one() {
591                        is_unused_now = true;
592                    } else {
593                        count.decrement_self();
594                    }
595                }
596                RequestedUsage::Writable => unreachable!(),
597            },
598            Some(Usage::Writable) => match requested_usage {
599                RequestedUsage::Writable => {
600                    is_unused_now = true;
601                }
602                RequestedUsage::Readonly => unreachable!(),
603            },
604            None => unreachable!(),
605        }
606
607        if is_unused_now {
608            self.current_usage = None;
609            self.blocked_usages_from_tasks.pop_front()
610        } else {
611            None
612        }
613    }
614
615    fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
616        assert_matches!(self.current_usage, Some(_));
617        self.blocked_usages_from_tasks.push_back(usage_from_task);
618    }
619
620    #[must_use]
621    fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
622        if matches!(
623            self.blocked_usages_from_tasks.front(),
624            Some((RequestedUsage::Readonly, _))
625        ) {
626            assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
627            self.blocked_usages_from_tasks.pop_front()
628        } else {
629            None
630        }
631    }
632
633    fn has_no_blocked_usage(&self) -> bool {
634        self.blocked_usages_from_tasks.is_empty()
635    }
636}
637
638const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
639
640/// Scheduler's internal data for each address ([`Pubkey`](`solana_pubkey::Pubkey`)). Very
641/// opaque wrapper type; no methods just with [`::clone()`](Clone::clone) and
642/// [`::default()`](Default::default).
643///
644/// It's the higher layer's responsibility to ensure to associate the same instance of UsageQueue
645/// for given Pubkey at the time of [task](Task) creation.
646#[derive(Debug, Clone, Default)]
647pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
648const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
649
650/// A high-level `struct`, managing the overall scheduling of [tasks](Task), to be used by
651/// `solana-unified-scheduler-pool`.
652pub struct SchedulingStateMachine {
653    unblocked_task_queue: VecDeque<Task>,
654    /// The number of all tasks which aren't `deschedule_task()`-ed yet while their ownership has
655    /// already been transferred to SchedulingStateMachine by `schedule_or_buffer_task()`. In other
656    /// words, they are running right now or buffered by explicit request or by implicit blocking
657    /// due to one of other _active_ (= running or buffered) conflicting tasks.
658    active_task_count: ShortCounter,
659    /// The number of tasks which are running right now.
660    running_task_count: ShortCounter,
661    /// The maximum number of running tasks at any given moment. While this could be tightly
662    /// related to the number of threads, terminology here is intentionally abstracted away to make
663    /// this struct purely logic only. As a hypothetical counter-example, tasks could be IO-bound,
664    /// in that case max_running_task_count will be coupled with the IO queue depth instead.
665    max_running_task_count: CounterInner,
666    handled_task_count: ShortCounter,
667    unblocked_task_count: ShortCounter,
668    total_task_count: ShortCounter,
669    count_token: BlockedUsageCountToken,
670    usage_queue_token: UsageQueueToken,
671}
672const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 56);
673
674impl SchedulingStateMachine {
675    pub fn has_no_running_task(&self) -> bool {
676        self.running_task_count.is_zero()
677    }
678
679    pub fn has_no_active_task(&self) -> bool {
680        self.active_task_count.is_zero()
681    }
682
683    pub fn has_unblocked_task(&self) -> bool {
684        !self.unblocked_task_queue.is_empty()
685    }
686
687    pub fn has_runnable_task(&self) -> bool {
688        self.has_unblocked_task() && self.is_task_runnable()
689    }
690
691    fn is_task_runnable(&self) -> bool {
692        self.running_task_count.current() < self.max_running_task_count
693    }
694
695    pub fn unblocked_task_queue_count(&self) -> usize {
696        self.unblocked_task_queue.len()
697    }
698
699    #[cfg(test)]
700    fn active_task_count(&self) -> CounterInner {
701        self.active_task_count.current()
702    }
703
704    #[cfg(test)]
705    fn handled_task_count(&self) -> CounterInner {
706        self.handled_task_count.current()
707    }
708
709    #[cfg(test)]
710    fn unblocked_task_count(&self) -> CounterInner {
711        self.unblocked_task_count.current()
712    }
713
714    #[cfg(test)]
715    fn total_task_count(&self) -> CounterInner {
716        self.total_task_count.current()
717    }
718
719    /// Schedules given `task`, returning it if successful.
720    ///
721    /// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`,
722    /// indicating the scheduled task is blocked currently.
723    ///
724    /// Note that this function takes ownership of the task to allow for future optimizations.
725    #[cfg(any(test, doc))]
726    #[must_use]
727    pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
728        self.schedule_or_buffer_task(task, false)
729    }
730
731    /// Adds given `task` to internal buffer, even if it's immediately schedulable otherwise.
732    ///
733    /// Put differently, buffering means to force the task to be blocked unconditionally after
734    /// normal scheduling processing.
735    ///
736    /// Thus, the task is internally retained inside this [`SchedulingStateMachine`], whether the
737    /// task is blocked or not. Eventually, the buffered task will be returned by one of later
738    /// invocations [`schedule_next_unblocked_task()`](Self::schedule_next_unblocked_task).
739    ///
740    /// Note that this function takes ownership of the task to allow for future optimizations.
741    pub fn buffer_task(&mut self, task: Task) {
742        self.schedule_or_buffer_task(task, true).unwrap_none();
743    }
744
745    /// Schedules or buffers given `task`, returning successful one unless buffering is forced.
746    ///
747    /// Refer to [`schedule_task()`](Self::schedule_task) and
748    /// [`buffer_task()`](Self::buffer_task) for the difference between _scheduling_ and
749    /// _buffering_ respectively.
750    ///
751    /// Note that this function takes ownership of the task to allow for future optimizations.
752    #[must_use]
753    pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option<Task> {
754        self.total_task_count.increment_self();
755        self.active_task_count.increment_self();
756        self.try_lock_usage_queues(task).and_then(|task| {
757            // locking succeeded, and then ...
758            if !self.is_task_runnable() || force_buffering {
759                // ... push to unblocked_task_queue, if buffering is forced.
760                self.unblocked_task_count.increment_self();
761                self.unblocked_task_queue.push_back(task);
762                None
763            } else {
764                // ... return the task back as schedulable to the caller as-is otherwise.
765                self.running_task_count.increment_self();
766                Some(task)
767            }
768        })
769    }
770
771    #[must_use]
772    pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
773        if !self.is_task_runnable() {
774            return None;
775        }
776
777        self.unblocked_task_queue.pop_front().inspect(|_| {
778            self.running_task_count.increment_self();
779            self.unblocked_task_count.increment_self();
780        })
781    }
782
783    /// Deschedules given scheduled `task`.
784    ///
785    /// This must be called exactly once for all scheduled tasks to uphold both
786    /// `SchedulingStateMachine` and `UsageQueue` internal state consistency at any given moment of
787    /// time. It's serious logic error to call this twice with the same task or none at all after
788    /// scheduling. Similarly, calling this with not scheduled task is also forbidden.
789    ///
790    /// Note that this function intentionally doesn't take ownership of the task to avoid dropping
791    /// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
792    /// opportunity for callers.
793    pub fn deschedule_task(&mut self, task: &Task) {
794        self.running_task_count.decrement_self();
795        self.active_task_count.decrement_self();
796        self.handled_task_count.increment_self();
797        self.unlock_usage_queues(task);
798    }
799
800    #[must_use]
801    fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
802        let mut blocked_usage_count = ShortCounter::zero();
803
804        for context in task.lock_contexts() {
805            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
806                let lock_result = if usage_queue.has_no_blocked_usage() {
807                    usage_queue.try_lock(context.requested_usage)
808                } else {
809                    LockResult::Err(())
810                };
811                if let Err(()) = lock_result {
812                    blocked_usage_count.increment_self();
813                    let usage_from_task = (context.requested_usage, task.clone());
814                    usage_queue.push_blocked_usage_from_task(usage_from_task);
815                }
816            });
817        }
818
819        // no blocked usage count means success
820        if blocked_usage_count.is_zero() {
821            Some(task)
822        } else {
823            task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
824            None
825        }
826    }
827
828    fn unlock_usage_queues(&mut self, task: &Task) {
829        for context in task.lock_contexts() {
830            context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
831                let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
832
833                while let Some((requested_usage, task_with_unblocked_queue)) =
834                    unblocked_task_from_queue
835                {
836                    // When `try_unblock()` returns `None` as a failure of unblocking this time,
837                    // this means the task is still blocked by other active task's usages. So,
838                    // don't push task into unblocked_task_queue yet. It can be assumed that every
839                    // task will eventually succeed to be unblocked, and enter in this condition
840                    // clause as long as `SchedulingStateMachine` is used correctly.
841                    if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
842                    {
843                        self.unblocked_task_queue.push_back(task);
844                    }
845
846                    match usage_queue.try_lock(requested_usage) {
847                        LockResult::Ok(()) => {
848                            // Try to further schedule blocked task for parallelism in the case of
849                            // readonly usages
850                            unblocked_task_from_queue =
851                                if matches!(requested_usage, RequestedUsage::Readonly) {
852                                    usage_queue.pop_unblocked_readonly_usage_from_task()
853                                } else {
854                                    None
855                                };
856                        }
857                        LockResult::Err(()) => panic!("should never fail in this context"),
858                    }
859                }
860            });
861        }
862    }
863
864    /// Creates a new task with [`RuntimeTransaction<SanitizedTransaction>`] with all of
865    /// its corresponding [`UsageQueue`]s preloaded.
866    ///
867    /// Closure (`usage_queue_loader`) is used to delegate the (possibly multi-threaded)
868    /// implementation of [`UsageQueue`] look-up by [`pubkey`](Pubkey) to callers. It's the
869    /// caller's responsibility to ensure the same instance is returned from the closure, given a
870    /// particular pubkey.
871    ///
872    /// Closure is used here to delegate the responsibility of primary ownership of `UsageQueue`
873    /// (and caching/pruning if any) to the caller. `SchedulingStateMachine` guarantees that all of
874    /// shared owndership of `UsageQueue`s are released and UsageQueue state is identical to just
875    /// after created, if `has_no_active_task()` is `true`. Also note that this is desired for
876    /// separation of concern.
877    pub fn create_task(
878        transaction: RuntimeTransaction<SanitizedTransaction>,
879        index: usize,
880        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
881    ) -> Task {
882        Self::do_create_task(
883            transaction,
884            index,
885            NO_CONSUMED_BLOCK_SIZE,
886            usage_queue_loader,
887        )
888    }
889
890    pub fn create_block_production_task(
891        transaction: RuntimeTransaction<SanitizedTransaction>,
892        index: usize,
893        consumed_block_size: BlockSize,
894        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
895    ) -> Task {
896        Self::do_create_task(transaction, index, consumed_block_size, usage_queue_loader)
897    }
898
899    fn do_create_task(
900        transaction: RuntimeTransaction<SanitizedTransaction>,
901        index: usize,
902        consumed_block_size: BlockSize,
903        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
904    ) -> Task {
905        // It's crucial for tasks to be validated with
906        // `account_locks::validate_account_locks()` prior to the creation.
907        // That's because it's part of protocol consensus regarding the
908        // rejection of blocks containing malformed transactions
909        // (`AccountLoadedTwice` and `TooManyAccountLocks`). Even more,
910        // `SchedulingStateMachine` can't properly handle transactions with
911        // duplicate addresses (those falling under `AccountLoadedTwice`).
912        //
913        // However, it's okay for now not to call `::validate_account_locks()`
914        // here.
915        //
916        // Currently `replay_stage` is always calling
917        //`::validate_account_locks()` regardless of whether unified-scheduler
918        // is enabled or not at the blockstore
919        // (`Bank::prepare_sanitized_batch()` is called in
920        // `process_entries()`). This verification will be hoisted for
921        // optimization when removing
922        // `--block-verification-method=blockstore-processor`.
923        //
924        // As for `banking_stage` with unified scheduler, it will need to run
925        // `validate_account_locks()` at least once somewhere in the code path.
926        // In the distant future, this function (`create_task()`) should be
927        // adjusted so that both stages do the checks before calling this or do
928        // the checks here, to simplify the two code paths regarding the
929        // essential `validate_account_locks` validation.
930        //
931        // Lastly, `validate_account_locks()` is currently called in
932        // `DefaultTransactionHandler::handle()` via
933        // `Bank::prepare_unlocked_batch_from_single_tx()` as well.
934        // This redundancy is known. It was just left as-is out of abundance
935        // of caution.
936        let lock_contexts = transaction
937            .message()
938            .account_keys()
939            .iter()
940            .enumerate()
941            .map(|(index, address)| {
942                LockContext::new(
943                    usage_queue_loader(*address),
944                    if transaction.message().is_writable(index) {
945                        RequestedUsage::Writable
946                    } else {
947                        RequestedUsage::Readonly
948                    },
949                )
950            })
951            .collect();
952
953        Task::new(TaskInner {
954            transaction,
955            index,
956            lock_contexts,
957            blocked_usage_count: TokenCell::new(ShortCounter::zero()),
958            consumed_block_size,
959        })
960    }
961
962    /// Rewind the inactive state machine to be initialized
963    ///
964    /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
965    /// This panics if the state machine hasn't properly been finished (i.e. there should be no
966    /// active task) to uphold invariants of [`UsageQueue`]s.
967    ///
968    /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
969    /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
970    /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
971    /// other slots.
972    ///
973    /// There's a related method called [`clear_and_reinitialize()`](Self::clear_and_reinitialize).
974    pub fn reinitialize(&mut self) {
975        assert!(self.has_no_active_task());
976        assert_eq!(self.running_task_count.current(), 0);
977        assert_eq!(self.unblocked_task_queue.len(), 0);
978        // nice trick to ensure all fields are handled here if new one is added.
979        let Self {
980            unblocked_task_queue: _,
981            active_task_count,
982            running_task_count: _,
983            max_running_task_count: _,
984            handled_task_count,
985            unblocked_task_count,
986            total_task_count,
987            count_token: _,
988            usage_queue_token: _,
989            // don't add ".." here
990        } = self;
991        active_task_count.reset_to_zero();
992        handled_task_count.reset_to_zero();
993        unblocked_task_count.reset_to_zero();
994        total_task_count.reset_to_zero();
995    }
996
997    /// Clear all buffered tasks and immediately rewind the state machine to be initialized
998    ///
999    /// This method _may_ panic if there are tasks which has been scheduled but hasn't been
1000    /// descheduled yet (called active tasks). This is due to the invocation of
1001    /// [`reinitialize()`](Self::reinitialize) at last. On the other hand, it's guaranteed not to
1002    /// panic otherwise. That's because the first clearing step effectively relaxes the runtime
1003    /// invariant of `reinitialize()` by making the state machine _inactive_ beforehand. After a
1004    /// successful operation, this method returns the number of cleared tasks.
1005    ///
1006    /// Somewhat surprisingly, the clearing logic is same as the normal (de-)scheduling operation
1007    /// because it is still the fastest way to just clear all tasks, under the consideration of
1008    /// potential later use of [`UsageQueue`]s. That's because `state_machine` doesn't maintain _the
1009    /// global list_ of tasks. Maintaining such one would incur a needless overhead on scheduling,
1010    /// which isn't strictly needed otherwise.
1011    ///
1012    /// Moreover, the descheduling operation is rather heavily optimized to begin with. All
1013    /// collection ops are just O(1) over total N of addresses accessed by all active tasks with
1014    /// no amortized mem ops.
1015    ///
1016    /// Whatever the algorithm is chosen, the ultimate goal of this operation is to clear all usage
1017    /// queues. Toward to that end, one may create a temporary hash set over [`UsageQueue`]s on the
1018    /// fly alternatively. However, that would be costlier than the above usual descheduling
1019    /// approach due to extra mem ops and many lookups/insertions.
1020    pub fn clear_and_reinitialize(&mut self) -> usize {
1021        let mut count = ShortCounter::zero();
1022        while let Some(task) = self.schedule_next_unblocked_task() {
1023            self.deschedule_task(&task);
1024            count.increment_self();
1025        }
1026        self.reinitialize();
1027        count.current().try_into().unwrap()
1028    }
1029
1030    /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
1031    /// well, thus carrying over `unsafe`.
1032    ///
1033    /// # Safety
1034    /// Call this exactly once for each thread. See [`TokenCell`] for details.
1035    #[must_use]
1036    pub unsafe fn exclusively_initialize_current_thread_for_scheduling(
1037        max_running_task_count: Option<usize>,
1038    ) -> Self {
1039        // As documented at `CounterInner`, don't expose rather opinionated choice of unsigned
1040        // integer type (`u32`) to outer world. So, take more conventional `usize` and convert it
1041        // to `CounterInner` here while uncontroversially treating `None` as no limit effectively.
1042        let max_running_task_count = max_running_task_count
1043            .unwrap_or(CounterInner::MAX as usize)
1044            .try_into()
1045            .unwrap();
1046
1047        Self {
1048            // It's very unlikely this is desired to be configurable, like
1049            // `UsageQueueInner::blocked_usages_from_tasks`'s cap.
1050            unblocked_task_queue: VecDeque::with_capacity(1024),
1051            active_task_count: ShortCounter::zero(),
1052            running_task_count: ShortCounter::zero(),
1053            max_running_task_count,
1054            handled_task_count: ShortCounter::zero(),
1055            unblocked_task_count: ShortCounter::zero(),
1056            total_task_count: ShortCounter::zero(),
1057            count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
1058            usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
1059        }
1060    }
1061
1062    #[cfg(test)]
1063    unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self {
1064        Self::exclusively_initialize_current_thread_for_scheduling(None)
1065    }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use {
1071        super::*,
1072        solana_instruction::{AccountMeta, Instruction},
1073        solana_message::Message,
1074        solana_pubkey::Pubkey,
1075        solana_transaction::{sanitized::SanitizedTransaction, Transaction},
1076        std::{cell::RefCell, collections::HashMap, rc::Rc},
1077    };
1078
1079    fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
1080        let message = Message::new(&[], Some(&Pubkey::new_unique()));
1081        let unsigned = Transaction::new_unsigned(message);
1082        RuntimeTransaction::from_transaction_for_tests(unsigned)
1083    }
1084
1085    fn transaction_with_readonly_address(
1086        address: Pubkey,
1087    ) -> RuntimeTransaction<SanitizedTransaction> {
1088        let instruction = Instruction {
1089            program_id: Pubkey::default(),
1090            accounts: vec![AccountMeta::new_readonly(address, false)],
1091            data: vec![],
1092        };
1093        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
1094        let unsigned = Transaction::new_unsigned(message);
1095        RuntimeTransaction::from_transaction_for_tests(unsigned)
1096    }
1097
1098    fn transaction_with_writable_address(
1099        address: Pubkey,
1100    ) -> RuntimeTransaction<SanitizedTransaction> {
1101        let instruction = Instruction {
1102            program_id: Pubkey::default(),
1103            accounts: vec![AccountMeta::new(address, false)],
1104            data: vec![],
1105        };
1106        let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
1107        let unsigned = Transaction::new_unsigned(message);
1108        RuntimeTransaction::from_transaction_for_tests(unsigned)
1109    }
1110
1111    fn create_address_loader(
1112        usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
1113    ) -> impl FnMut(Pubkey) -> UsageQueue {
1114        let usage_queues = usage_queues.unwrap_or_default();
1115        move |address| {
1116            usage_queues
1117                .borrow_mut()
1118                .entry(address)
1119                .or_default()
1120                .clone()
1121        }
1122    }
1123
1124    #[test]
1125    fn test_scheduling_state_machine_creation() {
1126        let state_machine = unsafe {
1127            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1128        };
1129        assert_eq!(state_machine.active_task_count(), 0);
1130        assert_eq!(state_machine.total_task_count(), 0);
1131        assert!(state_machine.has_no_active_task());
1132    }
1133
1134    #[test]
1135    fn test_scheduling_state_machine_good_reinitialization() {
1136        let mut state_machine = unsafe {
1137            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1138        };
1139        state_machine.total_task_count.increment_self();
1140        assert_eq!(state_machine.total_task_count(), 1);
1141        state_machine.reinitialize();
1142        assert_eq!(state_machine.total_task_count(), 0);
1143    }
1144
1145    #[test]
1146    #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
1147    fn test_scheduling_state_machine_bad_reinitialization() {
1148        let mut state_machine = unsafe {
1149            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1150        };
1151        let address_loader = &mut create_address_loader(None);
1152        let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
1153        state_machine.schedule_task(task).unwrap();
1154        state_machine.reinitialize();
1155    }
1156
1157    #[test]
1158    fn test_create_task() {
1159        let sanitized = simplest_transaction();
1160        let signature = *sanitized.signature();
1161        let task =
1162            SchedulingStateMachine::create_task(sanitized, 3, &mut |_| UsageQueue::default());
1163        assert_eq!(task.task_index(), 3);
1164        assert_eq!(task.transaction().signature(), &signature);
1165    }
1166
1167    #[test]
1168    fn test_non_conflicting_task_related_counts() {
1169        let sanitized = simplest_transaction();
1170        let address_loader = &mut create_address_loader(None);
1171        let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
1172
1173        let mut state_machine = unsafe {
1174            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1175        };
1176        let task = state_machine.schedule_task(task).unwrap();
1177        assert_eq!(state_machine.active_task_count(), 1);
1178        assert_eq!(state_machine.total_task_count(), 1);
1179        state_machine.deschedule_task(&task);
1180        assert_eq!(state_machine.active_task_count(), 0);
1181        assert_eq!(state_machine.total_task_count(), 1);
1182        assert!(state_machine.has_no_active_task());
1183    }
1184
1185    #[test]
1186    fn test_conflicting_task_related_counts() {
1187        let sanitized = simplest_transaction();
1188        let address_loader = &mut create_address_loader(None);
1189        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1190        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1191        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1192
1193        let mut state_machine = unsafe {
1194            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1195        };
1196        assert_matches!(
1197            state_machine
1198                .schedule_task(task1.clone())
1199                .map(|t| t.task_index()),
1200            Some(101)
1201        );
1202        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1203
1204        state_machine.deschedule_task(&task1);
1205        assert!(state_machine.has_unblocked_task());
1206        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1207
1208        // unblocked_task_count() should be incremented
1209        assert_eq!(state_machine.unblocked_task_count(), 0);
1210        assert_eq!(
1211            state_machine
1212                .schedule_next_unblocked_task()
1213                .map(|t| t.task_index()),
1214            Some(102)
1215        );
1216        assert_eq!(state_machine.unblocked_task_count(), 1);
1217
1218        // there's no blocked task anymore; calling schedule_next_unblocked_task should be noop and
1219        // shouldn't increment the unblocked_task_count().
1220        assert!(!state_machine.has_unblocked_task());
1221        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1222        assert_eq!(state_machine.unblocked_task_count(), 1);
1223
1224        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1225        state_machine.deschedule_task(&task2);
1226
1227        assert_matches!(
1228            state_machine
1229                .schedule_task(task3.clone())
1230                .map(|task| task.task_index()),
1231            Some(103)
1232        );
1233        state_machine.deschedule_task(&task3);
1234        assert!(state_machine.has_no_active_task());
1235    }
1236
1237    #[test]
1238    fn test_existing_blocking_task_then_newly_scheduled_task() {
1239        let sanitized = simplest_transaction();
1240        let address_loader = &mut create_address_loader(None);
1241        let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1242        let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1243        let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1244
1245        let mut state_machine = unsafe {
1246            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1247        };
1248        assert_matches!(
1249            state_machine
1250                .schedule_task(task1.clone())
1251                .map(|t| t.task_index()),
1252            Some(101)
1253        );
1254        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1255
1256        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1257        state_machine.deschedule_task(&task1);
1258        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1259
1260        // new task is arriving after task1 is already descheduled and task2 got unblocked
1261        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1262
1263        assert_eq!(state_machine.unblocked_task_count(), 0);
1264        assert_matches!(
1265            state_machine
1266                .schedule_next_unblocked_task()
1267                .map(|t| t.task_index()),
1268            Some(102)
1269        );
1270        assert_eq!(state_machine.unblocked_task_count(), 1);
1271
1272        state_machine.deschedule_task(&task2);
1273
1274        assert_matches!(
1275            state_machine
1276                .schedule_next_unblocked_task()
1277                .map(|t| t.task_index()),
1278            Some(103)
1279        );
1280        assert_eq!(state_machine.unblocked_task_count(), 2);
1281
1282        state_machine.deschedule_task(&task3);
1283        assert!(state_machine.has_no_active_task());
1284    }
1285
1286    #[test]
1287    fn test_multiple_readonly_task_and_counts() {
1288        let conflicting_address = Pubkey::new_unique();
1289        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1290        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1291        let address_loader = &mut create_address_loader(None);
1292        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1293        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1294
1295        let mut state_machine = unsafe {
1296            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1297        };
1298        // both of read-only tasks should be immediately runnable
1299        assert_matches!(
1300            state_machine
1301                .schedule_task(task1.clone())
1302                .map(|t| t.task_index()),
1303            Some(101)
1304        );
1305        assert_matches!(
1306            state_machine
1307                .schedule_task(task2.clone())
1308                .map(|t| t.task_index()),
1309            Some(102)
1310        );
1311
1312        assert_eq!(state_machine.active_task_count(), 2);
1313        assert_eq!(state_machine.handled_task_count(), 0);
1314        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1315        state_machine.deschedule_task(&task1);
1316        assert_eq!(state_machine.active_task_count(), 1);
1317        assert_eq!(state_machine.handled_task_count(), 1);
1318        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1319        state_machine.deschedule_task(&task2);
1320        assert_eq!(state_machine.active_task_count(), 0);
1321        assert_eq!(state_machine.handled_task_count(), 2);
1322        assert!(state_machine.has_no_active_task());
1323    }
1324
1325    #[test]
1326    fn test_all_blocking_readable_tasks_block_writable_task() {
1327        let conflicting_address = Pubkey::new_unique();
1328        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1329        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1330        let sanitized3 = transaction_with_writable_address(conflicting_address);
1331        let address_loader = &mut create_address_loader(None);
1332        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1333        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1334        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1335
1336        let mut state_machine = unsafe {
1337            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1338        };
1339        assert_matches!(
1340            state_machine
1341                .schedule_task(task1.clone())
1342                .map(|t| t.task_index()),
1343            Some(101)
1344        );
1345        assert_matches!(
1346            state_machine
1347                .schedule_task(task2.clone())
1348                .map(|t| t.task_index()),
1349            Some(102)
1350        );
1351        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1352
1353        assert_eq!(state_machine.active_task_count(), 3);
1354        assert_eq!(state_machine.handled_task_count(), 0);
1355        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1356        state_machine.deschedule_task(&task1);
1357        assert_eq!(state_machine.active_task_count(), 2);
1358        assert_eq!(state_machine.handled_task_count(), 1);
1359        assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1360        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1361        state_machine.deschedule_task(&task2);
1362        assert_eq!(state_machine.active_task_count(), 1);
1363        assert_eq!(state_machine.handled_task_count(), 2);
1364        assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1365        // task3 is finally unblocked after all of readable tasks (task1 and task2) is finished.
1366        assert_matches!(
1367            state_machine
1368                .schedule_next_unblocked_task()
1369                .map(|t| t.task_index()),
1370            Some(103)
1371        );
1372        state_machine.deschedule_task(&task3);
1373        assert!(state_machine.has_no_active_task());
1374    }
1375
1376    #[test]
1377    fn test_readonly_then_writable_then_readonly_linearized() {
1378        let conflicting_address = Pubkey::new_unique();
1379        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1380        let sanitized2 = transaction_with_writable_address(conflicting_address);
1381        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1382        let address_loader = &mut create_address_loader(None);
1383        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1384        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1385        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1386
1387        let mut state_machine = unsafe {
1388            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1389        };
1390        assert_matches!(
1391            state_machine
1392                .schedule_task(task1.clone())
1393                .map(|t| t.task_index()),
1394            Some(101)
1395        );
1396        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1397        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1398
1399        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1400        state_machine.deschedule_task(&task1);
1401        assert_matches!(
1402            state_machine
1403                .schedule_next_unblocked_task()
1404                .map(|t| t.task_index()),
1405            Some(102)
1406        );
1407        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1408        state_machine.deschedule_task(&task2);
1409        assert_matches!(
1410            state_machine
1411                .schedule_next_unblocked_task()
1412                .map(|t| t.task_index()),
1413            Some(103)
1414        );
1415        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1416        state_machine.deschedule_task(&task3);
1417        assert!(state_machine.has_no_active_task());
1418    }
1419
1420    #[test]
1421    fn test_readonly_then_writable() {
1422        let conflicting_address = Pubkey::new_unique();
1423        let sanitized1 = transaction_with_readonly_address(conflicting_address);
1424        let sanitized2 = transaction_with_writable_address(conflicting_address);
1425        let address_loader = &mut create_address_loader(None);
1426        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1427        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1428
1429        let mut state_machine = unsafe {
1430            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1431        };
1432        assert_matches!(
1433            state_machine
1434                .schedule_task(task1.clone())
1435                .map(|t| t.task_index()),
1436            Some(101)
1437        );
1438        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1439
1440        // descheduling read-locking task1 should equate to unblocking write-locking task2
1441        state_machine.deschedule_task(&task1);
1442        assert_matches!(
1443            state_machine
1444                .schedule_next_unblocked_task()
1445                .map(|t| t.task_index()),
1446            Some(102)
1447        );
1448        state_machine.deschedule_task(&task2);
1449        assert!(state_machine.has_no_active_task());
1450    }
1451
1452    #[test]
1453    fn test_blocked_tasks_writable_2_readonly_then_writable() {
1454        let conflicting_address = Pubkey::new_unique();
1455        let sanitized1 = transaction_with_writable_address(conflicting_address);
1456        let sanitized2 = transaction_with_readonly_address(conflicting_address);
1457        let sanitized3 = transaction_with_readonly_address(conflicting_address);
1458        let sanitized4 = transaction_with_writable_address(conflicting_address);
1459        let address_loader = &mut create_address_loader(None);
1460        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1461        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1462        let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1463        let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1464
1465        let mut state_machine = unsafe {
1466            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1467        };
1468        assert_matches!(
1469            state_machine
1470                .schedule_task(task1.clone())
1471                .map(|t| t.task_index()),
1472            Some(101)
1473        );
1474        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1475        assert_matches!(state_machine.schedule_task(task3.clone()), None);
1476        assert_matches!(state_machine.schedule_task(task4.clone()), None);
1477
1478        state_machine.deschedule_task(&task1);
1479        assert_matches!(
1480            state_machine
1481                .schedule_next_unblocked_task()
1482                .map(|t| t.task_index()),
1483            Some(102)
1484        );
1485        assert_matches!(
1486            state_machine
1487                .schedule_next_unblocked_task()
1488                .map(|t| t.task_index()),
1489            Some(103)
1490        );
1491        // the above deschedule_task(task1) call should only unblock task2 and task3 because these
1492        // are read-locking. And shouldn't unblock task4 because it's write-locking
1493        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1494
1495        state_machine.deschedule_task(&task2);
1496        // still task4 is blocked...
1497        assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1498
1499        state_machine.deschedule_task(&task3);
1500        // finally task4 should be unblocked
1501        assert_matches!(
1502            state_machine
1503                .schedule_next_unblocked_task()
1504                .map(|t| t.task_index()),
1505            Some(104)
1506        );
1507        state_machine.deschedule_task(&task4);
1508        assert!(state_machine.has_no_active_task());
1509    }
1510
1511    #[test]
1512    fn test_gradual_locking() {
1513        let conflicting_address = Pubkey::new_unique();
1514        let sanitized1 = transaction_with_writable_address(conflicting_address);
1515        let sanitized2 = transaction_with_writable_address(conflicting_address);
1516        let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1517        let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1518        let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1519        let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1520
1521        let mut state_machine = unsafe {
1522            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1523        };
1524        assert_matches!(
1525            state_machine
1526                .schedule_task(task1.clone())
1527                .map(|t| t.task_index()),
1528            Some(101)
1529        );
1530        assert_matches!(state_machine.schedule_task(task2.clone()), None);
1531        let usage_queues = usage_queues.borrow_mut();
1532        let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1533        usage_queue
1534            .0
1535            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1536                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1537            });
1538        // task2's fee payer should have been locked already even if task2 is blocked still via the
1539        // above the schedule_task(task2) call
1540        let fee_payer = task2.transaction().message().fee_payer();
1541        let usage_queue = usage_queues.get(fee_payer).unwrap();
1542        usage_queue
1543            .0
1544            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1545                assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1546            });
1547        state_machine.deschedule_task(&task1);
1548        assert_matches!(
1549            state_machine
1550                .schedule_next_unblocked_task()
1551                .map(|t| t.task_index()),
1552            Some(102)
1553        );
1554        state_machine.deschedule_task(&task2);
1555        assert!(state_machine.has_no_active_task());
1556    }
1557
1558    #[test]
1559    #[should_panic(expected = "internal error: entered unreachable code")]
1560    fn test_unreachable_unlock_conditions1() {
1561        let mut state_machine = unsafe {
1562            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1563        };
1564        let usage_queue = UsageQueue::default();
1565        usage_queue
1566            .0
1567            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1568                let _ = usage_queue.unlock(RequestedUsage::Writable);
1569            });
1570    }
1571
1572    #[test]
1573    #[should_panic(expected = "internal error: entered unreachable code")]
1574    fn test_unreachable_unlock_conditions2() {
1575        let mut state_machine = unsafe {
1576            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1577        };
1578        let usage_queue = UsageQueue::default();
1579        usage_queue
1580            .0
1581            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1582                usage_queue.current_usage = Some(Usage::Writable);
1583                let _ = usage_queue.unlock(RequestedUsage::Readonly);
1584            });
1585    }
1586
1587    #[test]
1588    #[should_panic(expected = "internal error: entered unreachable code")]
1589    fn test_unreachable_unlock_conditions3() {
1590        let mut state_machine = unsafe {
1591            SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1592        };
1593        let usage_queue = UsageQueue::default();
1594        usage_queue
1595            .0
1596            .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1597                usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1598                let _ = usage_queue.unlock(RequestedUsage::Writable);
1599            });
1600    }
1601}