1#![allow(rustdoc::private_intra_doc_links)]
2use {
99 crate::utils::{ShortCounter, Token, TokenCell},
100 assert_matches::assert_matches,
101 solana_pubkey::Pubkey,
102 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
103 solana_transaction::sanitized::SanitizedTransaction,
104 static_assertions::const_assert_eq,
105 std::{collections::VecDeque, mem, sync::Arc},
106 unwrap_none::UnwrapNone,
107};
108
109#[derive(Clone, Copy, Debug, PartialEq)]
110pub enum SchedulingMode {
111 BlockVerification,
112 BlockProduction,
113}
114
115type CounterInner = u32;
122
123mod utils {
125 use {
126 crate::CounterInner,
127 std::{
128 any::{self, TypeId},
129 cell::{RefCell, UnsafeCell},
130 collections::BTreeSet,
131 marker::PhantomData,
132 thread,
133 },
134 };
135
136 #[derive(Debug, Clone, Copy)]
140 pub(super) struct ShortCounter(CounterInner);
141
142 impl ShortCounter {
143 pub(super) fn zero() -> Self {
144 Self(0)
145 }
146
147 pub(super) fn one() -> Self {
148 Self(1)
149 }
150
151 pub(super) fn is_one(&self) -> bool {
152 self.0 == 1
153 }
154
155 pub(super) fn is_zero(&self) -> bool {
156 self.0 == 0
157 }
158
159 pub(super) fn current(&self) -> CounterInner {
160 self.0
161 }
162
163 #[must_use]
164 pub(super) fn increment(self) -> Self {
165 Self(self.0.checked_add(1).unwrap())
166 }
167
168 #[must_use]
169 pub(super) fn decrement(self) -> Self {
170 Self(self.0.checked_sub(1).unwrap())
171 }
172
173 pub(super) fn increment_self(&mut self) -> &mut Self {
174 *self = self.increment();
175 self
176 }
177
178 pub(super) fn decrement_self(&mut self) -> &mut Self {
179 *self = self.decrement();
180 self
181 }
182
183 pub(super) fn reset_to_zero(&mut self) -> &mut Self {
184 self.0 = 0;
185 self
186 }
187 }
188
189 #[derive(Debug, Default)]
222 pub(super) struct TokenCell<V>(UnsafeCell<V>);
223
224 impl<V> TokenCell<V> {
225 pub(super) fn new(value: V) -> Self {
235 Self(UnsafeCell::new(value))
236 }
237
238 pub(super) fn with_borrow_mut<R>(
249 &self,
250 _token: &mut Token<V>,
251 f: impl FnOnce(&mut V) -> R,
252 ) -> R {
253 f(unsafe { &mut *self.0.get() })
254 }
255 }
256
257 unsafe impl<V> Sync for TokenCell<V> {}
265
266 pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
273
274 impl<V> Token<V> {
275 #[must_use]
290 pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
291 thread_local! {
292 static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
293 }
294 assert!(
297 TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
298 "{:?} is wrongly initialized twice on {:?}",
299 any::type_name::<Self>(),
300 thread::current()
301 );
302
303 Self(PhantomData)
304 }
305 }
306
307 #[cfg(test)]
308 mod tests {
309 use {
310 super::{Token, TokenCell},
311 std::{mem, sync::Arc, thread},
312 };
313
314 #[test]
315 #[should_panic(
316 expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
317 initialized twice on Thread"
318 )]
319 fn test_second_creation_of_tokens_in_a_thread() {
320 unsafe {
321 let _ = Token::<usize>::assume_exclusive_mutating_thread();
322 let _ = Token::<usize>::assume_exclusive_mutating_thread();
323 }
324 }
325
326 #[derive(Debug)]
327 struct FakeQueue {
328 v: Vec<u8>,
329 }
330
331 #[test]
334 #[cfg_attr(miri, ignore)]
337 fn test_ub_illegally_created_multiple_tokens() {
338 let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
340 let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
341
342 let queue = TokenCell::new(FakeQueue {
343 v: Vec::with_capacity(20),
344 });
345 queue.with_borrow_mut(&mut token1, |queue_mut1| {
346 queue_mut1.v.push(1);
347 queue.with_borrow_mut(&mut token2, |queue_mut2| {
348 queue_mut2.v.push(2);
349 queue_mut1.v.push(3);
350 });
351 queue_mut1.v.push(4);
352 });
353
354 #[cfg(not(miri))]
356 dbg!(queue.0.into_inner());
357
358 }
361
362 #[test]
367 #[cfg_attr(miri, ignore)]
370 fn test_ub_illegally_shared_token_cell() {
371 let queue1 = Arc::new(TokenCell::new(FakeQueue {
372 v: Vec::with_capacity(20),
373 }));
374 let queue2 = queue1.clone();
375 #[cfg(not(miri))]
376 let queue3 = queue1.clone();
377
378 for _ in 0..10 {
381 let (queue1, queue2) = (queue1.clone(), queue2.clone());
382 let thread1 = thread::spawn(move || {
383 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
384 queue1.with_borrow_mut(&mut token, |queue| {
385 queue.v.push(3);
387 });
388 });
389 let thread2 = thread::spawn(move || {
392 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
393 queue2.with_borrow_mut(&mut token, |queue| {
394 queue.v.push(4);
396 });
397 });
398
399 thread1.join().unwrap();
400 thread2.join().unwrap();
401 }
402
403 #[cfg(not(miri))]
405 {
406 drop((queue1, queue2));
407 dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
408 }
409
410 }
413 }
414}
415
416type LockResult = Result<(), ()>;
419const_assert_eq!(mem::size_of::<LockResult>(), 1);
420
421pub type Task = Arc<TaskInner>;
423const_assert_eq!(mem::size_of::<Task>(), 8);
424
425pub type BlockSize = usize;
426pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
427
428type UsageQueueToken = Token<UsageQueueInner>;
430const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
431
432type BlockedUsageCountToken = Token<ShortCounter>;
434const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
435
436#[derive(Debug)]
438pub struct TaskInner {
439 transaction: RuntimeTransaction<SanitizedTransaction>,
440 index: usize,
444 lock_contexts: Vec<LockContext>,
445 blocked_usage_count: TokenCell<ShortCounter>,
446 consumed_block_size: BlockSize,
447}
448
449impl TaskInner {
450 pub fn task_index(&self) -> usize {
451 self.index
452 }
453
454 pub fn consumed_block_size(&self) -> BlockSize {
455 self.consumed_block_size
456 }
457
458 pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
459 &self.transaction
460 }
461
462 fn lock_contexts(&self) -> &[LockContext] {
463 &self.lock_contexts
464 }
465
466 fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
467 self.blocked_usage_count
468 .with_borrow_mut(token, |usage_count| {
469 *usage_count = count;
470 })
471 }
472
473 #[must_use]
474 fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
475 let did_unblock = self
476 .blocked_usage_count
477 .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
478 did_unblock.then_some(self)
479 }
480
481 pub fn into_transaction(self: Task) -> RuntimeTransaction<SanitizedTransaction> {
482 Task::into_inner(self).unwrap().transaction
483 }
484}
485
486#[derive(Debug)]
489struct LockContext {
490 usage_queue: UsageQueue,
491 requested_usage: RequestedUsage,
492}
493const_assert_eq!(mem::size_of::<LockContext>(), 16);
494
495impl LockContext {
496 fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
497 Self {
498 usage_queue,
499 requested_usage,
500 }
501 }
502
503 fn with_usage_queue_mut<R>(
504 &self,
505 usage_queue_token: &mut UsageQueueToken,
506 f: impl FnOnce(&mut UsageQueueInner) -> R,
507 ) -> R {
508 self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
509 }
510}
511
512#[derive(Copy, Clone, Debug)]
514enum Usage {
515 Readonly(ShortCounter),
516 Writable,
517}
518const_assert_eq!(mem::size_of::<Usage>(), 8);
519
520impl From<RequestedUsage> for Usage {
521 fn from(requested_usage: RequestedUsage) -> Self {
522 match requested_usage {
523 RequestedUsage::Readonly => Usage::Readonly(ShortCounter::one()),
524 RequestedUsage::Writable => Usage::Writable,
525 }
526 }
527}
528
529#[derive(Clone, Copy, Debug)]
531enum RequestedUsage {
532 Readonly,
533 Writable,
534}
535
536#[derive(Debug)]
542struct UsageQueueInner {
543 current_usage: Option<Usage>,
544 blocked_usages_from_tasks: VecDeque<UsageFromTask>,
545}
546
547type UsageFromTask = (RequestedUsage, Task);
548
549impl Default for UsageQueueInner {
550 fn default() -> Self {
551 Self {
552 current_usage: None,
553 blocked_usages_from_tasks: VecDeque::with_capacity(128),
563 }
564 }
565}
566
567impl UsageQueueInner {
568 fn try_lock(&mut self, requested_usage: RequestedUsage) -> LockResult {
569 match self.current_usage {
570 None => Some(Usage::from(requested_usage)),
571 Some(Usage::Readonly(count)) => match requested_usage {
572 RequestedUsage::Readonly => Some(Usage::Readonly(count.increment())),
573 RequestedUsage::Writable => None,
574 },
575 Some(Usage::Writable) => None,
576 }
577 .inspect(|&new_usage| {
578 self.current_usage = Some(new_usage);
579 })
580 .map(|_| ())
581 .ok_or(())
582 }
583
584 #[must_use]
585 fn unlock(&mut self, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
586 let mut is_unused_now = false;
587 match &mut self.current_usage {
588 Some(Usage::Readonly(ref mut count)) => match requested_usage {
589 RequestedUsage::Readonly => {
590 if count.is_one() {
591 is_unused_now = true;
592 } else {
593 count.decrement_self();
594 }
595 }
596 RequestedUsage::Writable => unreachable!(),
597 },
598 Some(Usage::Writable) => match requested_usage {
599 RequestedUsage::Writable => {
600 is_unused_now = true;
601 }
602 RequestedUsage::Readonly => unreachable!(),
603 },
604 None => unreachable!(),
605 }
606
607 if is_unused_now {
608 self.current_usage = None;
609 self.blocked_usages_from_tasks.pop_front()
610 } else {
611 None
612 }
613 }
614
615 fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
616 assert_matches!(self.current_usage, Some(_));
617 self.blocked_usages_from_tasks.push_back(usage_from_task);
618 }
619
620 #[must_use]
621 fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
622 if matches!(
623 self.blocked_usages_from_tasks.front(),
624 Some((RequestedUsage::Readonly, _))
625 ) {
626 assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
627 self.blocked_usages_from_tasks.pop_front()
628 } else {
629 None
630 }
631 }
632
633 fn has_no_blocked_usage(&self) -> bool {
634 self.blocked_usages_from_tasks.is_empty()
635 }
636}
637
638const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
639
640#[derive(Debug, Clone, Default)]
647pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
648const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
649
650pub struct SchedulingStateMachine {
653 unblocked_task_queue: VecDeque<Task>,
654 active_task_count: ShortCounter,
659 running_task_count: ShortCounter,
661 max_running_task_count: CounterInner,
666 handled_task_count: ShortCounter,
667 unblocked_task_count: ShortCounter,
668 total_task_count: ShortCounter,
669 count_token: BlockedUsageCountToken,
670 usage_queue_token: UsageQueueToken,
671}
672const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 56);
673
674impl SchedulingStateMachine {
675 pub fn has_no_running_task(&self) -> bool {
676 self.running_task_count.is_zero()
677 }
678
679 pub fn has_no_active_task(&self) -> bool {
680 self.active_task_count.is_zero()
681 }
682
683 pub fn has_unblocked_task(&self) -> bool {
684 !self.unblocked_task_queue.is_empty()
685 }
686
687 pub fn has_runnable_task(&self) -> bool {
688 self.has_unblocked_task() && self.is_task_runnable()
689 }
690
691 fn is_task_runnable(&self) -> bool {
692 self.running_task_count.current() < self.max_running_task_count
693 }
694
695 pub fn unblocked_task_queue_count(&self) -> usize {
696 self.unblocked_task_queue.len()
697 }
698
699 #[cfg(test)]
700 fn active_task_count(&self) -> CounterInner {
701 self.active_task_count.current()
702 }
703
704 #[cfg(test)]
705 fn handled_task_count(&self) -> CounterInner {
706 self.handled_task_count.current()
707 }
708
709 #[cfg(test)]
710 fn unblocked_task_count(&self) -> CounterInner {
711 self.unblocked_task_count.current()
712 }
713
714 #[cfg(test)]
715 fn total_task_count(&self) -> CounterInner {
716 self.total_task_count.current()
717 }
718
719 #[cfg(any(test, doc))]
726 #[must_use]
727 pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
728 self.schedule_or_buffer_task(task, false)
729 }
730
731 pub fn buffer_task(&mut self, task: Task) {
742 self.schedule_or_buffer_task(task, true).unwrap_none();
743 }
744
745 #[must_use]
753 pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option<Task> {
754 self.total_task_count.increment_self();
755 self.active_task_count.increment_self();
756 self.try_lock_usage_queues(task).and_then(|task| {
757 if !self.is_task_runnable() || force_buffering {
759 self.unblocked_task_count.increment_self();
761 self.unblocked_task_queue.push_back(task);
762 None
763 } else {
764 self.running_task_count.increment_self();
766 Some(task)
767 }
768 })
769 }
770
771 #[must_use]
772 pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
773 if !self.is_task_runnable() {
774 return None;
775 }
776
777 self.unblocked_task_queue.pop_front().inspect(|_| {
778 self.running_task_count.increment_self();
779 self.unblocked_task_count.increment_self();
780 })
781 }
782
783 pub fn deschedule_task(&mut self, task: &Task) {
794 self.running_task_count.decrement_self();
795 self.active_task_count.decrement_self();
796 self.handled_task_count.increment_self();
797 self.unlock_usage_queues(task);
798 }
799
800 #[must_use]
801 fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
802 let mut blocked_usage_count = ShortCounter::zero();
803
804 for context in task.lock_contexts() {
805 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
806 let lock_result = if usage_queue.has_no_blocked_usage() {
807 usage_queue.try_lock(context.requested_usage)
808 } else {
809 LockResult::Err(())
810 };
811 if let Err(()) = lock_result {
812 blocked_usage_count.increment_self();
813 let usage_from_task = (context.requested_usage, task.clone());
814 usage_queue.push_blocked_usage_from_task(usage_from_task);
815 }
816 });
817 }
818
819 if blocked_usage_count.is_zero() {
821 Some(task)
822 } else {
823 task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
824 None
825 }
826 }
827
828 fn unlock_usage_queues(&mut self, task: &Task) {
829 for context in task.lock_contexts() {
830 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
831 let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
832
833 while let Some((requested_usage, task_with_unblocked_queue)) =
834 unblocked_task_from_queue
835 {
836 if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
842 {
843 self.unblocked_task_queue.push_back(task);
844 }
845
846 match usage_queue.try_lock(requested_usage) {
847 LockResult::Ok(()) => {
848 unblocked_task_from_queue =
851 if matches!(requested_usage, RequestedUsage::Readonly) {
852 usage_queue.pop_unblocked_readonly_usage_from_task()
853 } else {
854 None
855 };
856 }
857 LockResult::Err(()) => panic!("should never fail in this context"),
858 }
859 }
860 });
861 }
862 }
863
864 pub fn create_task(
878 transaction: RuntimeTransaction<SanitizedTransaction>,
879 index: usize,
880 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
881 ) -> Task {
882 Self::do_create_task(
883 transaction,
884 index,
885 NO_CONSUMED_BLOCK_SIZE,
886 usage_queue_loader,
887 )
888 }
889
890 pub fn create_block_production_task(
891 transaction: RuntimeTransaction<SanitizedTransaction>,
892 index: usize,
893 consumed_block_size: BlockSize,
894 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
895 ) -> Task {
896 Self::do_create_task(transaction, index, consumed_block_size, usage_queue_loader)
897 }
898
899 fn do_create_task(
900 transaction: RuntimeTransaction<SanitizedTransaction>,
901 index: usize,
902 consumed_block_size: BlockSize,
903 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
904 ) -> Task {
905 let lock_contexts = transaction
937 .message()
938 .account_keys()
939 .iter()
940 .enumerate()
941 .map(|(index, address)| {
942 LockContext::new(
943 usage_queue_loader(*address),
944 if transaction.message().is_writable(index) {
945 RequestedUsage::Writable
946 } else {
947 RequestedUsage::Readonly
948 },
949 )
950 })
951 .collect();
952
953 Task::new(TaskInner {
954 transaction,
955 index,
956 lock_contexts,
957 blocked_usage_count: TokenCell::new(ShortCounter::zero()),
958 consumed_block_size,
959 })
960 }
961
962 pub fn reinitialize(&mut self) {
975 assert!(self.has_no_active_task());
976 assert_eq!(self.running_task_count.current(), 0);
977 assert_eq!(self.unblocked_task_queue.len(), 0);
978 let Self {
980 unblocked_task_queue: _,
981 active_task_count,
982 running_task_count: _,
983 max_running_task_count: _,
984 handled_task_count,
985 unblocked_task_count,
986 total_task_count,
987 count_token: _,
988 usage_queue_token: _,
989 } = self;
991 active_task_count.reset_to_zero();
992 handled_task_count.reset_to_zero();
993 unblocked_task_count.reset_to_zero();
994 total_task_count.reset_to_zero();
995 }
996
997 pub fn clear_and_reinitialize(&mut self) -> usize {
1021 let mut count = ShortCounter::zero();
1022 while let Some(task) = self.schedule_next_unblocked_task() {
1023 self.deschedule_task(&task);
1024 count.increment_self();
1025 }
1026 self.reinitialize();
1027 count.current().try_into().unwrap()
1028 }
1029
1030 #[must_use]
1036 pub unsafe fn exclusively_initialize_current_thread_for_scheduling(
1037 max_running_task_count: Option<usize>,
1038 ) -> Self {
1039 let max_running_task_count = max_running_task_count
1043 .unwrap_or(CounterInner::MAX as usize)
1044 .try_into()
1045 .unwrap();
1046
1047 Self {
1048 unblocked_task_queue: VecDeque::with_capacity(1024),
1051 active_task_count: ShortCounter::zero(),
1052 running_task_count: ShortCounter::zero(),
1053 max_running_task_count,
1054 handled_task_count: ShortCounter::zero(),
1055 unblocked_task_count: ShortCounter::zero(),
1056 total_task_count: ShortCounter::zero(),
1057 count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
1058 usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
1059 }
1060 }
1061
1062 #[cfg(test)]
1063 unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self {
1064 Self::exclusively_initialize_current_thread_for_scheduling(None)
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use {
1071 super::*,
1072 solana_instruction::{AccountMeta, Instruction},
1073 solana_message::Message,
1074 solana_pubkey::Pubkey,
1075 solana_transaction::{sanitized::SanitizedTransaction, Transaction},
1076 std::{cell::RefCell, collections::HashMap, rc::Rc},
1077 };
1078
1079 fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
1080 let message = Message::new(&[], Some(&Pubkey::new_unique()));
1081 let unsigned = Transaction::new_unsigned(message);
1082 RuntimeTransaction::from_transaction_for_tests(unsigned)
1083 }
1084
1085 fn transaction_with_readonly_address(
1086 address: Pubkey,
1087 ) -> RuntimeTransaction<SanitizedTransaction> {
1088 let instruction = Instruction {
1089 program_id: Pubkey::default(),
1090 accounts: vec![AccountMeta::new_readonly(address, false)],
1091 data: vec![],
1092 };
1093 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
1094 let unsigned = Transaction::new_unsigned(message);
1095 RuntimeTransaction::from_transaction_for_tests(unsigned)
1096 }
1097
1098 fn transaction_with_writable_address(
1099 address: Pubkey,
1100 ) -> RuntimeTransaction<SanitizedTransaction> {
1101 let instruction = Instruction {
1102 program_id: Pubkey::default(),
1103 accounts: vec![AccountMeta::new(address, false)],
1104 data: vec![],
1105 };
1106 let message = Message::new(&[instruction], Some(&Pubkey::new_unique()));
1107 let unsigned = Transaction::new_unsigned(message);
1108 RuntimeTransaction::from_transaction_for_tests(unsigned)
1109 }
1110
1111 fn create_address_loader(
1112 usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
1113 ) -> impl FnMut(Pubkey) -> UsageQueue {
1114 let usage_queues = usage_queues.unwrap_or_default();
1115 move |address| {
1116 usage_queues
1117 .borrow_mut()
1118 .entry(address)
1119 .or_default()
1120 .clone()
1121 }
1122 }
1123
1124 #[test]
1125 fn test_scheduling_state_machine_creation() {
1126 let state_machine = unsafe {
1127 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1128 };
1129 assert_eq!(state_machine.active_task_count(), 0);
1130 assert_eq!(state_machine.total_task_count(), 0);
1131 assert!(state_machine.has_no_active_task());
1132 }
1133
1134 #[test]
1135 fn test_scheduling_state_machine_good_reinitialization() {
1136 let mut state_machine = unsafe {
1137 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1138 };
1139 state_machine.total_task_count.increment_self();
1140 assert_eq!(state_machine.total_task_count(), 1);
1141 state_machine.reinitialize();
1142 assert_eq!(state_machine.total_task_count(), 0);
1143 }
1144
1145 #[test]
1146 #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
1147 fn test_scheduling_state_machine_bad_reinitialization() {
1148 let mut state_machine = unsafe {
1149 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1150 };
1151 let address_loader = &mut create_address_loader(None);
1152 let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
1153 state_machine.schedule_task(task).unwrap();
1154 state_machine.reinitialize();
1155 }
1156
1157 #[test]
1158 fn test_create_task() {
1159 let sanitized = simplest_transaction();
1160 let signature = *sanitized.signature();
1161 let task =
1162 SchedulingStateMachine::create_task(sanitized, 3, &mut |_| UsageQueue::default());
1163 assert_eq!(task.task_index(), 3);
1164 assert_eq!(task.transaction().signature(), &signature);
1165 }
1166
1167 #[test]
1168 fn test_non_conflicting_task_related_counts() {
1169 let sanitized = simplest_transaction();
1170 let address_loader = &mut create_address_loader(None);
1171 let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
1172
1173 let mut state_machine = unsafe {
1174 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1175 };
1176 let task = state_machine.schedule_task(task).unwrap();
1177 assert_eq!(state_machine.active_task_count(), 1);
1178 assert_eq!(state_machine.total_task_count(), 1);
1179 state_machine.deschedule_task(&task);
1180 assert_eq!(state_machine.active_task_count(), 0);
1181 assert_eq!(state_machine.total_task_count(), 1);
1182 assert!(state_machine.has_no_active_task());
1183 }
1184
1185 #[test]
1186 fn test_conflicting_task_related_counts() {
1187 let sanitized = simplest_transaction();
1188 let address_loader = &mut create_address_loader(None);
1189 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1190 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1191 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1192
1193 let mut state_machine = unsafe {
1194 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1195 };
1196 assert_matches!(
1197 state_machine
1198 .schedule_task(task1.clone())
1199 .map(|t| t.task_index()),
1200 Some(101)
1201 );
1202 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1203
1204 state_machine.deschedule_task(&task1);
1205 assert!(state_machine.has_unblocked_task());
1206 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1207
1208 assert_eq!(state_machine.unblocked_task_count(), 0);
1210 assert_eq!(
1211 state_machine
1212 .schedule_next_unblocked_task()
1213 .map(|t| t.task_index()),
1214 Some(102)
1215 );
1216 assert_eq!(state_machine.unblocked_task_count(), 1);
1217
1218 assert!(!state_machine.has_unblocked_task());
1221 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1222 assert_eq!(state_machine.unblocked_task_count(), 1);
1223
1224 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1225 state_machine.deschedule_task(&task2);
1226
1227 assert_matches!(
1228 state_machine
1229 .schedule_task(task3.clone())
1230 .map(|task| task.task_index()),
1231 Some(103)
1232 );
1233 state_machine.deschedule_task(&task3);
1234 assert!(state_machine.has_no_active_task());
1235 }
1236
1237 #[test]
1238 fn test_existing_blocking_task_then_newly_scheduled_task() {
1239 let sanitized = simplest_transaction();
1240 let address_loader = &mut create_address_loader(None);
1241 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1242 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1243 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1244
1245 let mut state_machine = unsafe {
1246 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1247 };
1248 assert_matches!(
1249 state_machine
1250 .schedule_task(task1.clone())
1251 .map(|t| t.task_index()),
1252 Some(101)
1253 );
1254 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1255
1256 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1257 state_machine.deschedule_task(&task1);
1258 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1259
1260 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1262
1263 assert_eq!(state_machine.unblocked_task_count(), 0);
1264 assert_matches!(
1265 state_machine
1266 .schedule_next_unblocked_task()
1267 .map(|t| t.task_index()),
1268 Some(102)
1269 );
1270 assert_eq!(state_machine.unblocked_task_count(), 1);
1271
1272 state_machine.deschedule_task(&task2);
1273
1274 assert_matches!(
1275 state_machine
1276 .schedule_next_unblocked_task()
1277 .map(|t| t.task_index()),
1278 Some(103)
1279 );
1280 assert_eq!(state_machine.unblocked_task_count(), 2);
1281
1282 state_machine.deschedule_task(&task3);
1283 assert!(state_machine.has_no_active_task());
1284 }
1285
1286 #[test]
1287 fn test_multiple_readonly_task_and_counts() {
1288 let conflicting_address = Pubkey::new_unique();
1289 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1290 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1291 let address_loader = &mut create_address_loader(None);
1292 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1293 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1294
1295 let mut state_machine = unsafe {
1296 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1297 };
1298 assert_matches!(
1300 state_machine
1301 .schedule_task(task1.clone())
1302 .map(|t| t.task_index()),
1303 Some(101)
1304 );
1305 assert_matches!(
1306 state_machine
1307 .schedule_task(task2.clone())
1308 .map(|t| t.task_index()),
1309 Some(102)
1310 );
1311
1312 assert_eq!(state_machine.active_task_count(), 2);
1313 assert_eq!(state_machine.handled_task_count(), 0);
1314 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1315 state_machine.deschedule_task(&task1);
1316 assert_eq!(state_machine.active_task_count(), 1);
1317 assert_eq!(state_machine.handled_task_count(), 1);
1318 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1319 state_machine.deschedule_task(&task2);
1320 assert_eq!(state_machine.active_task_count(), 0);
1321 assert_eq!(state_machine.handled_task_count(), 2);
1322 assert!(state_machine.has_no_active_task());
1323 }
1324
1325 #[test]
1326 fn test_all_blocking_readable_tasks_block_writable_task() {
1327 let conflicting_address = Pubkey::new_unique();
1328 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1329 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1330 let sanitized3 = transaction_with_writable_address(conflicting_address);
1331 let address_loader = &mut create_address_loader(None);
1332 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1333 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1334 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1335
1336 let mut state_machine = unsafe {
1337 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1338 };
1339 assert_matches!(
1340 state_machine
1341 .schedule_task(task1.clone())
1342 .map(|t| t.task_index()),
1343 Some(101)
1344 );
1345 assert_matches!(
1346 state_machine
1347 .schedule_task(task2.clone())
1348 .map(|t| t.task_index()),
1349 Some(102)
1350 );
1351 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1352
1353 assert_eq!(state_machine.active_task_count(), 3);
1354 assert_eq!(state_machine.handled_task_count(), 0);
1355 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1356 state_machine.deschedule_task(&task1);
1357 assert_eq!(state_machine.active_task_count(), 2);
1358 assert_eq!(state_machine.handled_task_count(), 1);
1359 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1360 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1361 state_machine.deschedule_task(&task2);
1362 assert_eq!(state_machine.active_task_count(), 1);
1363 assert_eq!(state_machine.handled_task_count(), 2);
1364 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1365 assert_matches!(
1367 state_machine
1368 .schedule_next_unblocked_task()
1369 .map(|t| t.task_index()),
1370 Some(103)
1371 );
1372 state_machine.deschedule_task(&task3);
1373 assert!(state_machine.has_no_active_task());
1374 }
1375
1376 #[test]
1377 fn test_readonly_then_writable_then_readonly_linearized() {
1378 let conflicting_address = Pubkey::new_unique();
1379 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1380 let sanitized2 = transaction_with_writable_address(conflicting_address);
1381 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1382 let address_loader = &mut create_address_loader(None);
1383 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1384 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1385 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1386
1387 let mut state_machine = unsafe {
1388 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1389 };
1390 assert_matches!(
1391 state_machine
1392 .schedule_task(task1.clone())
1393 .map(|t| t.task_index()),
1394 Some(101)
1395 );
1396 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1397 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1398
1399 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1400 state_machine.deschedule_task(&task1);
1401 assert_matches!(
1402 state_machine
1403 .schedule_next_unblocked_task()
1404 .map(|t| t.task_index()),
1405 Some(102)
1406 );
1407 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1408 state_machine.deschedule_task(&task2);
1409 assert_matches!(
1410 state_machine
1411 .schedule_next_unblocked_task()
1412 .map(|t| t.task_index()),
1413 Some(103)
1414 );
1415 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1416 state_machine.deschedule_task(&task3);
1417 assert!(state_machine.has_no_active_task());
1418 }
1419
1420 #[test]
1421 fn test_readonly_then_writable() {
1422 let conflicting_address = Pubkey::new_unique();
1423 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1424 let sanitized2 = transaction_with_writable_address(conflicting_address);
1425 let address_loader = &mut create_address_loader(None);
1426 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1427 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1428
1429 let mut state_machine = unsafe {
1430 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1431 };
1432 assert_matches!(
1433 state_machine
1434 .schedule_task(task1.clone())
1435 .map(|t| t.task_index()),
1436 Some(101)
1437 );
1438 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1439
1440 state_machine.deschedule_task(&task1);
1442 assert_matches!(
1443 state_machine
1444 .schedule_next_unblocked_task()
1445 .map(|t| t.task_index()),
1446 Some(102)
1447 );
1448 state_machine.deschedule_task(&task2);
1449 assert!(state_machine.has_no_active_task());
1450 }
1451
1452 #[test]
1453 fn test_blocked_tasks_writable_2_readonly_then_writable() {
1454 let conflicting_address = Pubkey::new_unique();
1455 let sanitized1 = transaction_with_writable_address(conflicting_address);
1456 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1457 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1458 let sanitized4 = transaction_with_writable_address(conflicting_address);
1459 let address_loader = &mut create_address_loader(None);
1460 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1461 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1462 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1463 let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1464
1465 let mut state_machine = unsafe {
1466 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1467 };
1468 assert_matches!(
1469 state_machine
1470 .schedule_task(task1.clone())
1471 .map(|t| t.task_index()),
1472 Some(101)
1473 );
1474 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1475 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1476 assert_matches!(state_machine.schedule_task(task4.clone()), None);
1477
1478 state_machine.deschedule_task(&task1);
1479 assert_matches!(
1480 state_machine
1481 .schedule_next_unblocked_task()
1482 .map(|t| t.task_index()),
1483 Some(102)
1484 );
1485 assert_matches!(
1486 state_machine
1487 .schedule_next_unblocked_task()
1488 .map(|t| t.task_index()),
1489 Some(103)
1490 );
1491 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1494
1495 state_machine.deschedule_task(&task2);
1496 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1498
1499 state_machine.deschedule_task(&task3);
1500 assert_matches!(
1502 state_machine
1503 .schedule_next_unblocked_task()
1504 .map(|t| t.task_index()),
1505 Some(104)
1506 );
1507 state_machine.deschedule_task(&task4);
1508 assert!(state_machine.has_no_active_task());
1509 }
1510
1511 #[test]
1512 fn test_gradual_locking() {
1513 let conflicting_address = Pubkey::new_unique();
1514 let sanitized1 = transaction_with_writable_address(conflicting_address);
1515 let sanitized2 = transaction_with_writable_address(conflicting_address);
1516 let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1517 let address_loader = &mut create_address_loader(Some(usage_queues.clone()));
1518 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1519 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1520
1521 let mut state_machine = unsafe {
1522 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1523 };
1524 assert_matches!(
1525 state_machine
1526 .schedule_task(task1.clone())
1527 .map(|t| t.task_index()),
1528 Some(101)
1529 );
1530 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1531 let usage_queues = usage_queues.borrow_mut();
1532 let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1533 usage_queue
1534 .0
1535 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1536 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1537 });
1538 let fee_payer = task2.transaction().message().fee_payer();
1541 let usage_queue = usage_queues.get(fee_payer).unwrap();
1542 usage_queue
1543 .0
1544 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1545 assert_matches!(usage_queue.current_usage, Some(Usage::Writable));
1546 });
1547 state_machine.deschedule_task(&task1);
1548 assert_matches!(
1549 state_machine
1550 .schedule_next_unblocked_task()
1551 .map(|t| t.task_index()),
1552 Some(102)
1553 );
1554 state_machine.deschedule_task(&task2);
1555 assert!(state_machine.has_no_active_task());
1556 }
1557
1558 #[test]
1559 #[should_panic(expected = "internal error: entered unreachable code")]
1560 fn test_unreachable_unlock_conditions1() {
1561 let mut state_machine = unsafe {
1562 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1563 };
1564 let usage_queue = UsageQueue::default();
1565 usage_queue
1566 .0
1567 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1568 let _ = usage_queue.unlock(RequestedUsage::Writable);
1569 });
1570 }
1571
1572 #[test]
1573 #[should_panic(expected = "internal error: entered unreachable code")]
1574 fn test_unreachable_unlock_conditions2() {
1575 let mut state_machine = unsafe {
1576 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1577 };
1578 let usage_queue = UsageQueue::default();
1579 usage_queue
1580 .0
1581 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1582 usage_queue.current_usage = Some(Usage::Writable);
1583 let _ = usage_queue.unlock(RequestedUsage::Readonly);
1584 });
1585 }
1586
1587 #[test]
1588 #[should_panic(expected = "internal error: entered unreachable code")]
1589 fn test_unreachable_unlock_conditions3() {
1590 let mut state_machine = unsafe {
1591 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1592 };
1593 let usage_queue = UsageQueue::default();
1594 usage_queue
1595 .0
1596 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1597 usage_queue.current_usage = Some(Usage::Readonly(ShortCounter::one()));
1598 let _ = usage_queue.unlock(RequestedUsage::Writable);
1599 });
1600 }
1601}