1#![cfg_attr(
2 not(feature = "agave-unstable-api"),
3 deprecated(
4 since = "3.1.0",
5 note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6 v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7 acknowledge use of an interface that may break without warning."
8 )
9)]
10#![allow(rustdoc::private_intra_doc_links)]
11use {
108 crate::utils::{ShortCounter, Token, TokenCell},
109 assert_matches::assert_matches,
110 solana_pubkey::Pubkey,
111 solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
112 solana_transaction::sanitized::SanitizedTransaction,
113 static_assertions::const_assert_eq,
114 std::{
115 cmp::Ordering,
116 collections::{BTreeMap, VecDeque},
117 mem,
118 sync::Arc,
119 },
120 unwrap_none::UnwrapNone,
121};
122
123#[derive(Clone, Copy, Debug, PartialEq)]
124pub enum SchedulingMode {
125 BlockVerification,
126 BlockProduction,
127}
128
129#[derive(Debug)]
130pub enum Capability {
131 FifoQueueing,
133 PriorityQueueing,
138}
139
140type CounterInner = u32;
147
148pub type OrderedTaskId = u128;
149
150mod utils {
152 use {
153 crate::CounterInner,
154 std::{
155 any::{self, TypeId},
156 cell::{RefCell, UnsafeCell},
157 collections::BTreeSet,
158 marker::PhantomData,
159 thread,
160 },
161 };
162
163 #[derive(Debug, Clone, Copy)]
167 pub(super) struct ShortCounter(CounterInner);
168
169 impl ShortCounter {
170 pub(super) fn zero() -> Self {
171 Self(0)
172 }
173
174 pub(super) fn one() -> Self {
175 Self(1)
176 }
177
178 pub(super) fn is_one(&self) -> bool {
179 self.0 == 1
180 }
181
182 pub(super) fn is_zero(&self) -> bool {
183 self.0 == 0
184 }
185
186 pub(super) fn current(&self) -> CounterInner {
187 self.0
188 }
189
190 #[must_use]
191 pub(super) fn increment(self) -> Self {
192 Self(self.0.checked_add(1).unwrap())
193 }
194
195 #[must_use]
196 pub(super) fn decrement(self) -> Self {
197 Self(self.0.checked_sub(1).unwrap())
198 }
199
200 pub(super) fn increment_self(&mut self) -> &mut Self {
201 *self = self.increment();
202 self
203 }
204
205 pub(super) fn decrement_self(&mut self) -> &mut Self {
206 *self = self.decrement();
207 self
208 }
209
210 pub(super) fn reset_to_zero(&mut self) -> &mut Self {
211 self.0 = 0;
212 self
213 }
214 }
215
216 #[derive(Debug, Default)]
249 pub(super) struct TokenCell<V>(UnsafeCell<V>);
250
251 impl<V> TokenCell<V> {
252 pub(super) fn new(value: V) -> Self {
262 Self(UnsafeCell::new(value))
263 }
264
265 pub(super) fn with_borrow_mut<R>(
276 &self,
277 _token: &mut Token<V>,
278 f: impl FnOnce(&mut V) -> R,
279 ) -> R {
280 f(unsafe { &mut *self.0.get() })
281 }
282 }
283
284 unsafe impl<V> Sync for TokenCell<V> {}
292
293 pub(super) struct Token<V: 'static>(PhantomData<*mut V>);
300
301 impl<V> Token<V> {
302 #[must_use]
317 pub(super) unsafe fn assume_exclusive_mutating_thread() -> Self {
318 thread_local! {
319 static TOKENS: RefCell<BTreeSet<TypeId>> = const { RefCell::new(BTreeSet::new()) };
320 }
321 assert!(
324 TOKENS.with_borrow_mut(|tokens| tokens.insert(TypeId::of::<Self>())),
325 "{:?} is wrongly initialized twice on {:?}",
326 any::type_name::<Self>(),
327 thread::current()
328 );
329
330 Self(PhantomData)
331 }
332 }
333
334 #[cfg(test)]
335 mod tests {
336 use {
337 super::{Token, TokenCell},
338 std::{mem, sync::Arc, thread},
339 };
340
341 #[test]
342 #[should_panic(
343 expected = "\"solana_unified_scheduler_logic::utils::Token<usize>\" is wrongly \
344 initialized twice on Thread"
345 )]
346 fn test_second_creation_of_tokens_in_a_thread() {
347 unsafe {
348 let _ = Token::<usize>::assume_exclusive_mutating_thread();
349 let _ = Token::<usize>::assume_exclusive_mutating_thread();
350 }
351 }
352
353 #[derive(Debug)]
354 struct FakeQueue {
355 v: Vec<u8>,
356 }
357
358 #[test]
361 #[cfg_attr(miri, ignore)]
364 fn test_ub_illegally_created_multiple_tokens() {
365 let mut token1 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
367 let mut token2 = unsafe { mem::transmute::<(), Token<FakeQueue>>(()) };
368
369 let queue = TokenCell::new(FakeQueue {
370 v: Vec::with_capacity(20),
371 });
372 queue.with_borrow_mut(&mut token1, |queue_mut1| {
373 queue_mut1.v.push(1);
374 queue.with_borrow_mut(&mut token2, |queue_mut2| {
375 queue_mut2.v.push(2);
376 queue_mut1.v.push(3);
377 });
378 queue_mut1.v.push(4);
379 });
380
381 #[cfg(not(miri))]
383 dbg!(queue.0.into_inner());
384
385 }
388
389 #[test]
394 #[cfg_attr(miri, ignore)]
397 fn test_ub_illegally_shared_token_cell() {
398 let queue1 = Arc::new(TokenCell::new(FakeQueue {
399 v: Vec::with_capacity(20),
400 }));
401 let queue2 = queue1.clone();
402 #[cfg(not(miri))]
403 let queue3 = queue1.clone();
404
405 for _ in 0..10 {
408 let (queue1, queue2) = (queue1.clone(), queue2.clone());
409 let thread1 = thread::spawn(move || {
410 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
411 queue1.with_borrow_mut(&mut token, |queue| {
412 queue.v.push(3);
414 });
415 });
416 let thread2 = thread::spawn(move || {
419 let mut token = unsafe { Token::assume_exclusive_mutating_thread() };
420 queue2.with_borrow_mut(&mut token, |queue| {
421 queue.v.push(4);
423 });
424 });
425
426 thread1.join().unwrap();
427 thread2.join().unwrap();
428 }
429
430 #[cfg(not(miri))]
432 {
433 drop((queue1, queue2));
434 dbg!(Arc::into_inner(queue3).unwrap().0.into_inner());
435 }
436
437 }
440 }
441}
442
443type LockResult = Result<(), ()>;
446const_assert_eq!(mem::size_of::<LockResult>(), 1);
447
448pub type Task = Arc<TaskInner>;
450const_assert_eq!(mem::size_of::<Task>(), 8);
451
452pub type BlockSize = usize;
453pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
454
455type UsageQueueToken = Token<UsageQueueInner>;
457const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
458
459type BlockedUsageCountToken = Token<ShortCounter>;
461const_assert_eq!(mem::size_of::<BlockedUsageCountToken>(), 0);
462
463#[derive(Debug)]
465pub struct TaskInner {
466 transaction: RuntimeTransaction<SanitizedTransaction>,
467 task_id: OrderedTaskId,
473 lock_contexts: Vec<LockContext>,
474 blocked_usage_count: TokenCell<ShortCounter>,
478 consumed_block_size: BlockSize,
479}
480
481impl TaskInner {
482 pub fn task_id(&self) -> OrderedTaskId {
483 self.task_id
484 }
485
486 pub fn is_higher_priority(&self, other: &Self) -> bool {
487 match self.task_id().cmp(&other.task_id()) {
488 Ordering::Less => true,
489 Ordering::Greater => false,
490 Ordering::Equal => panic!("self-compariton"),
491 }
492 }
493
494 pub fn consumed_block_size(&self) -> BlockSize {
495 self.consumed_block_size
496 }
497
498 pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
499 &self.transaction
500 }
501
502 fn lock_contexts(&self) -> &[LockContext] {
503 &self.lock_contexts
504 }
505
506 fn set_blocked_usage_count(&self, token: &mut BlockedUsageCountToken, count: ShortCounter) {
507 self.blocked_usage_count
508 .with_borrow_mut(token, |usage_count| {
509 *usage_count = count;
510 })
511 }
512
513 #[must_use]
521 fn try_unblock(self: Task, token: &mut BlockedUsageCountToken) -> Option<Task> {
522 let did_unblock = self
523 .blocked_usage_count
524 .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero());
525 did_unblock.then_some(self)
526 }
527
528 fn try_reblock(&self, token: &mut BlockedUsageCountToken) -> bool {
541 self.blocked_usage_count
542 .with_borrow_mut(token, |usage_count| {
543 if usage_count.is_zero() {
544 false
545 } else {
546 usage_count.increment_self();
547 true
548 }
549 })
550 }
551
552 pub fn into_transaction(self: Task) -> RuntimeTransaction<SanitizedTransaction> {
553 Task::into_inner(self).unwrap().transaction
554 }
555}
556
557#[derive(Debug)]
560struct LockContext {
561 usage_queue: UsageQueue,
562 requested_usage: RequestedUsage,
563}
564const_assert_eq!(mem::size_of::<LockContext>(), 16);
565
566impl LockContext {
567 fn new(usage_queue: UsageQueue, requested_usage: RequestedUsage) -> Self {
568 Self {
569 usage_queue,
570 requested_usage,
571 }
572 }
573
574 fn with_usage_queue_mut<R>(
575 &self,
576 usage_queue_token: &mut UsageQueueToken,
577 f: impl FnOnce(&mut UsageQueueInner) -> R,
578 ) -> R {
579 self.usage_queue.0.with_borrow_mut(usage_queue_token, f)
580 }
581}
582
583#[derive(Copy, Clone, Debug)]
585enum Usage<R, W> {
586 Readonly(R),
587 Writable(W),
588}
589
590impl<R, W> Usage<R, W> {
591 fn requested_usage(&self) -> RequestedUsage {
592 match self {
593 Self::Readonly(_) => RequestedUsage::Readonly,
594 Self::Writable(_) => RequestedUsage::Writable,
595 }
596 }
597}
598
599type FifoUsage = Usage<ShortCounter, ()>;
600const_assert_eq!(mem::size_of::<FifoUsage>(), 8);
601
602type PriorityUsage = Usage<BTreeMap<OrderedTaskId, Task>, Task>;
614const_assert_eq!(mem::size_of::<PriorityUsage>(), 32);
615
616impl From<RequestedUsage> for FifoUsage {
617 fn from(requested_usage: RequestedUsage) -> Self {
618 match requested_usage {
619 RequestedUsage::Readonly => Self::Readonly(ShortCounter::one()),
620 RequestedUsage::Writable => Self::Writable(()),
621 }
622 }
623}
624
625impl PriorityUsage {
626 fn from(task: Task, requested_usage: RequestedUsage) -> Self {
627 match requested_usage {
628 RequestedUsage::Readonly => Self::Readonly(BTreeMap::from([(task.task_id(), task)])),
629 RequestedUsage::Writable => Self::Writable(task),
630 }
631 }
632
633 fn take_readable(maybe_usage: &mut Option<Self>) {
634 let Some(Self::Readonly(tasks)) = maybe_usage.take() else {
635 panic!();
636 };
637 assert!(tasks.is_empty());
638 }
639
640 fn take_writable(maybe_usage: &mut Option<Self>) -> Task {
641 let Some(Self::Writable(task)) = maybe_usage.take() else {
642 panic!();
643 };
644 task
645 }
646}
647
648#[derive(Clone, Copy, Debug)]
650enum RequestedUsage {
651 Readonly,
652 Writable,
653}
654
655type PriorityUsageQueue = BTreeMap<OrderedTaskId, UsageFromTask>;
657
658trait PriorityUsageQueueExt: Sized {
659 fn insert_usage_from_task(&mut self, usage_from_task: UsageFromTask);
660 fn pop_first_usage_from_task(&mut self) -> Option<UsageFromTask>;
661 fn first_usage_from_task(&self) -> Option<&UsageFromTask>;
662}
663
664impl PriorityUsageQueueExt for PriorityUsageQueue {
665 fn insert_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
666 self.insert(usage_from_task.1.task_id(), usage_from_task)
667 .unwrap_none();
668 }
669
670 fn pop_first_usage_from_task(&mut self) -> Option<UsageFromTask> {
671 self.pop_first().map(|(_index, usage)| usage)
672 }
673
674 fn first_usage_from_task(&self) -> Option<&UsageFromTask> {
675 self.first_key_value().map(|(_index, usage)| usage)
676 }
677}
678
679#[derive(Debug)]
685enum UsageQueueInner {
686 Fifo {
687 current_usage: Option<FifoUsage>,
688 blocked_usages_from_tasks: VecDeque<UsageFromTask>,
689 },
690 Priority {
691 current_usage: Option<PriorityUsage>,
692 blocked_usages_from_tasks: PriorityUsageQueue,
693 },
694}
695
696type UsageFromTask = (RequestedUsage, Task);
697
698impl UsageQueueInner {
699 fn with_fifo() -> Self {
700 Self::Fifo {
701 current_usage: None,
702 blocked_usages_from_tasks: VecDeque::with_capacity(128),
712 }
713 }
714
715 fn with_priority() -> Self {
716 Self::Priority {
717 current_usage: None,
718 blocked_usages_from_tasks: PriorityUsageQueue::new(),
724 }
725 }
726
727 fn new(capability: &Capability) -> Self {
728 match capability {
729 Capability::FifoQueueing => Self::with_fifo(),
730 Capability::PriorityQueueing => Self::with_priority(),
731 }
732 }
733}
734
735impl UsageQueueInner {
736 fn try_lock(&mut self, new_task: &Task, requested_usage: RequestedUsage) -> LockResult {
737 match self {
738 Self::Fifo { current_usage, .. } => match current_usage {
739 None => Ok(FifoUsage::from(requested_usage)),
740 Some(FifoUsage::Readonly(count)) => match requested_usage {
741 RequestedUsage::Readonly => Ok(FifoUsage::Readonly(count.increment())),
742 RequestedUsage::Writable => Err(()),
743 },
744 Some(FifoUsage::Writable(())) => Err(()),
745 }
746 .map(|new_usage| {
747 *current_usage = Some(new_usage);
748 }),
749 Self::Priority { current_usage, .. } => match current_usage {
750 Some(PriorityUsage::Readonly(tasks)) => match requested_usage {
751 RequestedUsage::Readonly => {
752 tasks
753 .insert(new_task.task_id(), new_task.clone())
754 .unwrap_none();
755 Ok(())
756 }
757 RequestedUsage::Writable => Err(()),
758 },
759 Some(PriorityUsage::Writable(_task)) => Err(()),
760 None => {
761 *current_usage = Some(PriorityUsage::from(new_task.clone(), requested_usage));
762
763 Ok(())
764 }
765 },
766 }
767 }
768
769 #[must_use]
770 fn unlock(&mut self, task: &Task, requested_usage: RequestedUsage) -> Option<UsageFromTask> {
771 let mut is_newly_lockable = false;
772 match self {
773 Self::Fifo { current_usage, .. } => {
774 match current_usage {
775 Some(FifoUsage::Readonly(ref mut count)) => match requested_usage {
776 RequestedUsage::Readonly => {
777 if count.is_one() {
778 is_newly_lockable = true;
779 } else {
780 count.decrement_self();
781 }
782 }
783 RequestedUsage::Writable => unreachable!(),
784 },
785 Some(FifoUsage::Writable(())) => {
786 assert_matches!(requested_usage, RequestedUsage::Writable);
787 is_newly_lockable = true;
788 }
789 None => unreachable!(),
790 }
791 if is_newly_lockable {
792 *current_usage = None;
793 }
794 }
795 Self::Priority { current_usage, .. } => {
796 match current_usage {
797 Some(PriorityUsage::Readonly(tasks)) => match requested_usage {
798 RequestedUsage::Readonly => {
799 tasks.remove(&task.task_id()).unwrap();
801 if tasks.is_empty() {
802 is_newly_lockable = true;
803 }
804 }
805 RequestedUsage::Writable => unreachable!(),
806 },
807 Some(PriorityUsage::Writable(_task)) => {
808 assert_matches!(requested_usage, RequestedUsage::Writable);
809 is_newly_lockable = true;
810 }
811 None => unreachable!(),
812 }
813 if is_newly_lockable {
814 *current_usage = None;
815 }
816 }
817 }
818
819 if is_newly_lockable {
820 self.pop()
821 } else {
822 None
823 }
824 }
825
826 fn push_blocked(&mut self, usage_from_task: UsageFromTask) {
827 assert_matches!(self.current_usage(), Some(_));
828 self.push(usage_from_task);
829 }
830
831 #[must_use]
832 fn pop_lockable_readonly(&mut self) -> Option<UsageFromTask> {
833 if matches!(self.peek_blocked(), Some((RequestedUsage::Readonly, _))) {
834 assert_matches!(self.current_usage(), Some(RequestedUsage::Readonly));
835 self.pop()
836 } else {
837 None
838 }
839 }
840
841 fn current_usage(&self) -> Option<RequestedUsage> {
842 match self {
843 Self::Fifo { current_usage, .. } => {
844 current_usage.as_ref().map(|usage| usage.requested_usage())
845 }
846 Self::Priority { current_usage, .. } => {
847 current_usage.as_ref().map(|usage| usage.requested_usage())
848 }
849 }
850 }
851
852 #[cfg(test)]
853 fn update_current_usage(&mut self, requested_usage: RequestedUsage, task: &Task) {
854 match self {
855 Self::Fifo { current_usage, .. } => {
856 *current_usage = Some(FifoUsage::from(requested_usage));
857 }
858 Self::Priority { current_usage, .. } => {
859 *current_usage = Some(PriorityUsage::from(task.clone(), requested_usage));
860 }
861 }
862 }
863
864 fn pop(&mut self) -> Option<UsageFromTask> {
865 match self {
866 Self::Fifo {
867 blocked_usages_from_tasks,
868 ..
869 } => blocked_usages_from_tasks.pop_front(),
870 Self::Priority {
871 blocked_usages_from_tasks,
872 ..
873 } => blocked_usages_from_tasks.pop_first_usage_from_task(),
874 }
875 }
876
877 fn push(&mut self, usage_from_task: UsageFromTask) {
878 match self {
879 Self::Fifo {
880 blocked_usages_from_tasks,
881 ..
882 } => blocked_usages_from_tasks.push_back(usage_from_task),
883 Self::Priority {
884 blocked_usages_from_tasks,
885 ..
886 } => blocked_usages_from_tasks.insert_usage_from_task(usage_from_task),
887 }
888 }
889
890 fn peek_blocked(&self) -> Option<&UsageFromTask> {
891 match self {
892 Self::Fifo {
893 blocked_usages_from_tasks,
894 ..
895 } => blocked_usages_from_tasks.front(),
896 Self::Priority {
897 blocked_usages_from_tasks,
898 ..
899 } => blocked_usages_from_tasks.first_usage_from_task(),
900 }
901 }
902
903 fn prepare_lock(
904 &mut self,
905 token: &mut BlockedUsageCountToken,
906 new_task: &Task,
907 requested_usage: RequestedUsage,
908 ) -> LockResult {
909 match self {
910 Self::Fifo {
911 blocked_usages_from_tasks,
912 ..
913 } => {
914 if blocked_usages_from_tasks.is_empty() {
915 Ok(())
916 } else {
917 Err(())
918 }
919 }
920 Self::Priority {
936 current_usage,
937 blocked_usages_from_tasks,
938 } => {
939 let mut current_and_requested_usage = (current_usage, requested_usage);
941
942 match &mut current_and_requested_usage {
943 (None, _) => {
944 assert!(blocked_usages_from_tasks.is_empty());
945 Ok(())
946 }
947 (Some(PriorityUsage::Writable(current_task)), _requested_usage) => {
948 if !new_task.is_higher_priority(current_task)
949 || !current_task.try_reblock(token)
950 {
951 return Err(());
952 }
953 let reblocked_task = Usage::take_writable(current_and_requested_usage.0);
954 blocked_usages_from_tasks
955 .insert_usage_from_task((RequestedUsage::Writable, reblocked_task));
956 Ok(())
957 }
958 (Some(PriorityUsage::Readonly(_current_tasks)), RequestedUsage::Readonly) => {
959 let Some((peeked_usage, peeked_task)) = self.peek_blocked() else {
960 return Ok(());
961 };
962
963 assert_matches!(peeked_usage, RequestedUsage::Writable);
968 if !new_task.is_higher_priority(peeked_task) {
969 return Err(());
970 }
971 Ok(())
972 }
973 (Some(PriorityUsage::Readonly(current_tasks)), RequestedUsage::Writable) => {
974 let task_indexes = current_tasks
982 .range(new_task.task_id()..)
983 .filter_map(|(&task_id, task)| {
984 task.try_reblock(token).then_some(task_id)
985 })
986 .collect::<Vec<OrderedTaskId>>();
987 for task_id in task_indexes.into_iter() {
988 let reblocked_task = current_tasks.remove(&task_id).unwrap();
989 blocked_usages_from_tasks
990 .insert_usage_from_task((RequestedUsage::Readonly, reblocked_task));
991 }
992
993 if current_tasks.is_empty() {
994 Usage::take_readable(current_and_requested_usage.0);
995 Ok(())
996 } else {
997 Err(())
1005 }
1006 }
1007 }
1008 }
1009 }
1010 }
1011}
1012
1013const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 56);
1014
1015#[derive(Debug, Clone)]
1022pub struct UsageQueue(Arc<TokenCell<UsageQueueInner>>);
1023const_assert_eq!(mem::size_of::<UsageQueue>(), 8);
1024
1025impl UsageQueue {
1026 pub fn new(capability: &Capability) -> Self {
1027 Self(Arc::new(TokenCell::new(UsageQueueInner::new(capability))))
1028 }
1029}
1030
1031pub struct SchedulingStateMachine {
1034 unblocked_task_queue: VecDeque<Task>,
1035 active_task_count: ShortCounter,
1040 running_task_count: ShortCounter,
1042 max_running_task_count: CounterInner,
1047 handled_task_count: ShortCounter,
1048 unblocked_task_count: ShortCounter,
1049 total_task_count: ShortCounter,
1050 count_token: BlockedUsageCountToken,
1051 usage_queue_token: UsageQueueToken,
1052}
1053const_assert_eq!(mem::size_of::<SchedulingStateMachine>(), 56);
1054
1055impl SchedulingStateMachine {
1056 pub fn has_no_running_task(&self) -> bool {
1057 self.running_task_count.is_zero()
1058 }
1059
1060 pub fn has_no_active_task(&self) -> bool {
1061 self.active_task_count.is_zero()
1062 }
1063
1064 pub fn has_unblocked_task(&self) -> bool {
1065 !self.unblocked_task_queue.is_empty()
1066 }
1067
1068 pub fn has_runnable_task(&self) -> bool {
1069 self.has_unblocked_task() && self.is_task_runnable()
1070 }
1071
1072 fn is_task_runnable(&self) -> bool {
1073 self.running_task_count.current() < self.max_running_task_count
1074 }
1075
1076 pub fn unblocked_task_queue_count(&self) -> usize {
1077 self.unblocked_task_queue.len()
1078 }
1079
1080 #[cfg(test)]
1081 fn active_task_count(&self) -> CounterInner {
1082 self.active_task_count.current()
1083 }
1084
1085 #[cfg(test)]
1086 fn handled_task_count(&self) -> CounterInner {
1087 self.handled_task_count.current()
1088 }
1089
1090 #[cfg(test)]
1091 fn unblocked_task_count(&self) -> CounterInner {
1092 self.unblocked_task_count.current()
1093 }
1094
1095 #[cfg(test)]
1096 fn total_task_count(&self) -> CounterInner {
1097 self.total_task_count.current()
1098 }
1099
1100 #[cfg(any(test, doc))]
1107 #[must_use]
1108 pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
1109 self.schedule_or_buffer_task(task, false)
1110 }
1111
1112 pub fn buffer_task(&mut self, task: Task) {
1123 self.schedule_or_buffer_task(task, true).unwrap_none();
1124 }
1125
1126 #[must_use]
1134 pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option<Task> {
1135 self.total_task_count.increment_self();
1136 self.active_task_count.increment_self();
1137 self.try_lock_usage_queues(task).and_then(|task| {
1138 if !self.is_task_runnable() || force_buffering {
1140 self.unblocked_task_count.increment_self();
1142 self.unblocked_task_queue.push_back(task);
1143 None
1144 } else {
1145 self.running_task_count.increment_self();
1147 Some(task)
1148 }
1149 })
1150 }
1151
1152 #[must_use]
1153 pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
1154 if !self.is_task_runnable() {
1155 return None;
1156 }
1157
1158 self.unblocked_task_queue.pop_front().inspect(|_| {
1159 self.running_task_count.increment_self();
1160 self.unblocked_task_count.increment_self();
1161 })
1162 }
1163
1164 pub fn deschedule_task(&mut self, task: &Task) {
1175 self.running_task_count.decrement_self();
1176 self.active_task_count.decrement_self();
1177 self.handled_task_count.increment_self();
1178 self.unlock_usage_queues(task);
1179 }
1180
1181 #[must_use]
1182 fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
1183 let mut blocked_usage_count = ShortCounter::zero();
1184
1185 for context in task.lock_contexts() {
1186 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
1187 let lock_result = usage_queue
1188 .prepare_lock(&mut self.count_token, &task, context.requested_usage)
1189 .and_then(|()| usage_queue.try_lock(&task, context.requested_usage));
1190 if let Err(()) = lock_result {
1191 blocked_usage_count.increment_self();
1192 let usage_from_task = (context.requested_usage, task.clone());
1193 usage_queue.push_blocked(usage_from_task);
1194 }
1195 });
1196 }
1197
1198 if blocked_usage_count.is_zero() {
1200 Some(task)
1201 } else {
1202 task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
1203 None
1204 }
1205 }
1206
1207 fn unlock_usage_queues(&mut self, task: &Task) {
1208 for context in task.lock_contexts() {
1209 context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
1210 let mut newly_lockable = usage_queue.unlock(task, context.requested_usage);
1211 while let Some((lockable_usage, lockable_task)) = newly_lockable {
1212 usage_queue
1213 .try_lock(&lockable_task, lockable_usage)
1214 .unwrap();
1215
1216 if let Some(unblocked_task) = lockable_task.try_unblock(&mut self.count_token) {
1222 self.unblocked_task_queue.push_back(unblocked_task);
1223 }
1224
1225 newly_lockable = matches!(lockable_usage, RequestedUsage::Readonly)
1228 .then(|| usage_queue.pop_lockable_readonly())
1229 .flatten();
1230 }
1231 });
1232 }
1233 }
1234
1235 pub fn create_task(
1249 transaction: RuntimeTransaction<SanitizedTransaction>,
1250 task_id: OrderedTaskId,
1251 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1252 ) -> Task {
1253 Self::do_create_task(
1254 transaction,
1255 task_id,
1256 NO_CONSUMED_BLOCK_SIZE,
1257 usage_queue_loader,
1258 )
1259 }
1260
1261 pub fn create_block_production_task(
1262 transaction: RuntimeTransaction<SanitizedTransaction>,
1263 task_id: OrderedTaskId,
1264 consumed_block_size: BlockSize,
1265 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1266 ) -> Task {
1267 Self::do_create_task(
1268 transaction,
1269 task_id,
1270 consumed_block_size,
1271 usage_queue_loader,
1272 )
1273 }
1274
1275 fn do_create_task(
1276 transaction: RuntimeTransaction<SanitizedTransaction>,
1277 task_id: OrderedTaskId,
1278 consumed_block_size: BlockSize,
1279 usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
1280 ) -> Task {
1281 let lock_contexts = transaction
1313 .message()
1314 .account_keys()
1315 .iter()
1316 .enumerate()
1317 .map(|(task_id, address)| {
1318 LockContext::new(
1319 usage_queue_loader(*address),
1320 if transaction.message().is_writable(task_id) {
1321 RequestedUsage::Writable
1322 } else {
1323 RequestedUsage::Readonly
1324 },
1325 )
1326 })
1327 .collect();
1328
1329 Task::new(TaskInner {
1330 transaction,
1331 task_id,
1332 lock_contexts,
1333 blocked_usage_count: TokenCell::new(ShortCounter::zero()),
1334 consumed_block_size,
1335 })
1336 }
1337
1338 pub fn reinitialize(&mut self) {
1351 assert!(self.has_no_active_task());
1352 assert_eq!(self.running_task_count.current(), 0);
1353 assert_eq!(self.unblocked_task_queue.len(), 0);
1354 let Self {
1356 unblocked_task_queue: _,
1357 active_task_count,
1358 running_task_count: _,
1359 max_running_task_count: _,
1360 handled_task_count,
1361 unblocked_task_count,
1362 total_task_count,
1363 count_token: _,
1364 usage_queue_token: _,
1365 } = self;
1367 active_task_count.reset_to_zero();
1368 handled_task_count.reset_to_zero();
1369 unblocked_task_count.reset_to_zero();
1370 total_task_count.reset_to_zero();
1371 }
1372
1373 pub fn clear_and_reinitialize(&mut self) -> usize {
1397 let mut count = ShortCounter::zero();
1398 while let Some(task) = self.schedule_next_unblocked_task() {
1399 self.deschedule_task(&task);
1400 count.increment_self();
1401 }
1402 self.reinitialize();
1403 count.current().try_into().unwrap()
1404 }
1405
1406 #[must_use]
1412 pub unsafe fn exclusively_initialize_current_thread_for_scheduling(
1413 max_running_task_count: Option<usize>,
1414 ) -> Self {
1415 let max_running_task_count = max_running_task_count
1419 .unwrap_or(CounterInner::MAX as usize)
1420 .try_into()
1421 .unwrap();
1422
1423 Self {
1424 unblocked_task_queue: VecDeque::with_capacity(1024),
1427 active_task_count: ShortCounter::zero(),
1428 running_task_count: ShortCounter::zero(),
1429 max_running_task_count,
1430 handled_task_count: ShortCounter::zero(),
1431 unblocked_task_count: ShortCounter::zero(),
1432 total_task_count: ShortCounter::zero(),
1433 count_token: unsafe { BlockedUsageCountToken::assume_exclusive_mutating_thread() },
1434 usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() },
1435 }
1436 }
1437
1438 #[cfg(test)]
1439 unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self {
1440 Self::exclusively_initialize_current_thread_for_scheduling(None)
1441 }
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446 use {
1447 super::*,
1448 solana_instruction::{AccountMeta, Instruction},
1449 solana_message::Message,
1450 solana_pubkey::Pubkey,
1451 solana_transaction::{sanitized::SanitizedTransaction, Transaction},
1452 std::{
1453 cell::RefCell,
1454 collections::HashMap,
1455 panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
1456 rc::Rc,
1457 },
1458 test_case::test_matrix,
1459 };
1460
1461 fn simplest_transaction() -> RuntimeTransaction<SanitizedTransaction> {
1462 let message = Message::new(&[], Some(&Pubkey::new_unique()));
1463 let unsigned = Transaction::new_unsigned(message);
1464 RuntimeTransaction::from_transaction_for_tests(unsigned)
1465 }
1466
1467 fn transaction_with_readonly_address(
1468 address: Pubkey,
1469 ) -> RuntimeTransaction<SanitizedTransaction> {
1470 transaction_with_readonly_address_with_payer(address, &Pubkey::new_unique())
1471 }
1472
1473 fn transaction_with_readonly_address_with_payer(
1474 address: Pubkey,
1475 payer: &Pubkey,
1476 ) -> RuntimeTransaction<SanitizedTransaction> {
1477 let instruction = Instruction {
1478 program_id: Pubkey::default(),
1479 accounts: vec![AccountMeta::new_readonly(address, false)],
1480 data: vec![],
1481 };
1482 let message = Message::new(&[instruction], Some(payer));
1483 let unsigned = Transaction::new_unsigned(message);
1484 RuntimeTransaction::from_transaction_for_tests(unsigned)
1485 }
1486
1487 fn transaction_with_writable_address(
1488 address: Pubkey,
1489 ) -> RuntimeTransaction<SanitizedTransaction> {
1490 transaction_with_writable_address_with_payer(address, &Pubkey::new_unique())
1491 }
1492
1493 fn transaction_with_writable_address_with_payer(
1494 address: Pubkey,
1495 payer: &Pubkey,
1496 ) -> RuntimeTransaction<SanitizedTransaction> {
1497 let instruction = Instruction {
1498 program_id: Pubkey::default(),
1499 accounts: vec![AccountMeta::new(address, false)],
1500 data: vec![],
1501 };
1502 let message = Message::new(&[instruction], Some(payer));
1503 let unsigned = Transaction::new_unsigned(message);
1504 RuntimeTransaction::from_transaction_for_tests(unsigned)
1505 }
1506
1507 fn create_address_loader(
1508 usage_queues: Option<Rc<RefCell<HashMap<Pubkey, UsageQueue>>>>,
1509 capability: &Capability,
1510 ) -> impl FnMut(Pubkey) -> UsageQueue + use<'_> {
1511 let usage_queues = usage_queues.unwrap_or_default();
1512 move |address| {
1513 usage_queues
1514 .borrow_mut()
1515 .entry(address)
1516 .or_insert_with(|| UsageQueue::new(capability))
1517 .clone()
1518 }
1519 }
1520
1521 #[test]
1522 fn test_scheduling_state_machine_creation() {
1523 let state_machine = unsafe {
1524 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1525 };
1526 assert_eq!(state_machine.active_task_count(), 0);
1527 assert_eq!(state_machine.total_task_count(), 0);
1528 assert!(state_machine.has_no_active_task());
1529 }
1530
1531 #[test]
1532 fn test_scheduling_state_machine_good_reinitialization() {
1533 let mut state_machine = unsafe {
1534 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1535 };
1536 state_machine.total_task_count.increment_self();
1537 assert_eq!(state_machine.total_task_count(), 1);
1538 state_machine.reinitialize();
1539 assert_eq!(state_machine.total_task_count(), 0);
1540 }
1541
1542 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1543 #[should_panic(expected = "assertion failed: self.has_no_active_task()")]
1544 fn test_scheduling_state_machine_bad_reinitialization(capability: Capability) {
1545 let mut state_machine = unsafe {
1546 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1547 };
1548 let address_loader = &mut create_address_loader(None, &capability);
1549 let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader);
1550 state_machine.schedule_task(task.clone()).unwrap();
1551 let bad_reinitialize = catch_unwind(AssertUnwindSafe(|| state_machine.reinitialize()));
1552
1553 state_machine.deschedule_task(&task);
1559 if let Err(some_panic) = bad_reinitialize {
1560 resume_unwind(some_panic);
1561 }
1562 }
1563
1564 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1565 fn test_create_task(capability: Capability) {
1566 let sanitized = simplest_transaction();
1567 let signature = *sanitized.signature();
1568 let task = SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1569 UsageQueue::new(&capability)
1570 });
1571 assert_eq!(task.task_id(), 3);
1572 assert_eq!(task.transaction().signature(), &signature);
1573 }
1574
1575 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1576 fn test_non_conflicting_task_related_counts(capability: Capability) {
1577 let sanitized = simplest_transaction();
1578 let address_loader = &mut create_address_loader(None, &capability);
1579 let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader);
1580
1581 let mut state_machine = unsafe {
1582 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1583 };
1584 let task = state_machine.schedule_task(task).unwrap();
1585 assert_eq!(state_machine.active_task_count(), 1);
1586 assert_eq!(state_machine.total_task_count(), 1);
1587 state_machine.deschedule_task(&task);
1588 assert_eq!(state_machine.active_task_count(), 0);
1589 assert_eq!(state_machine.total_task_count(), 1);
1590 assert!(state_machine.has_no_active_task());
1591 }
1592
1593 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1594 fn test_conflicting_task_related_counts(capability: Capability) {
1595 let sanitized = simplest_transaction();
1596 let address_loader = &mut create_address_loader(None, &capability);
1597 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1598 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1599 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1600
1601 let mut state_machine = unsafe {
1602 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1603 };
1604 assert_matches!(
1605 state_machine
1606 .schedule_task(task1.clone())
1607 .map(|t| t.task_id()),
1608 Some(101)
1609 );
1610 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1611
1612 state_machine.deschedule_task(&task1);
1613 assert!(state_machine.has_unblocked_task());
1614 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1615
1616 assert_eq!(state_machine.unblocked_task_count(), 0);
1618 assert_eq!(
1619 state_machine
1620 .schedule_next_unblocked_task()
1621 .map(|t| t.task_id()),
1622 Some(102)
1623 );
1624 assert_eq!(state_machine.unblocked_task_count(), 1);
1625
1626 assert!(!state_machine.has_unblocked_task());
1629 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1630 assert_eq!(state_machine.unblocked_task_count(), 1);
1631
1632 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1633 state_machine.deschedule_task(&task2);
1634
1635 assert_matches!(
1636 state_machine
1637 .schedule_task(task3.clone())
1638 .map(|task| task.task_id()),
1639 Some(103)
1640 );
1641 state_machine.deschedule_task(&task3);
1642 assert!(state_machine.has_no_active_task());
1643 }
1644
1645 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1646 fn test_existing_blocking_task_then_newly_scheduled_task(capability: Capability) {
1647 let sanitized = simplest_transaction();
1648 let address_loader = &mut create_address_loader(None, &capability);
1649 let task1 = SchedulingStateMachine::create_task(sanitized.clone(), 101, address_loader);
1650 let task2 = SchedulingStateMachine::create_task(sanitized.clone(), 102, address_loader);
1651 let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader);
1652
1653 let mut state_machine = unsafe {
1654 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1655 };
1656 assert_matches!(
1657 state_machine
1658 .schedule_task(task1.clone())
1659 .map(|t| t.task_id()),
1660 Some(101)
1661 );
1662 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1663
1664 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1665 state_machine.deschedule_task(&task1);
1666 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1667
1668 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1670
1671 assert_eq!(state_machine.unblocked_task_count(), 0);
1672 assert_matches!(
1673 state_machine
1674 .schedule_next_unblocked_task()
1675 .map(|t| t.task_id()),
1676 Some(102)
1677 );
1678 assert_eq!(state_machine.unblocked_task_count(), 1);
1679
1680 state_machine.deschedule_task(&task2);
1681
1682 assert_matches!(
1683 state_machine
1684 .schedule_next_unblocked_task()
1685 .map(|t| t.task_id()),
1686 Some(103)
1687 );
1688 assert_eq!(state_machine.unblocked_task_count(), 2);
1689
1690 state_machine.deschedule_task(&task3);
1691 assert!(state_machine.has_no_active_task());
1692 }
1693
1694 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1695 fn test_multiple_readonly_task_and_counts(capability: Capability) {
1696 let conflicting_address = Pubkey::new_unique();
1697 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1698 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1699 let address_loader = &mut create_address_loader(None, &capability);
1700 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1701 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1702
1703 let mut state_machine = unsafe {
1704 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1705 };
1706 assert_matches!(
1708 state_machine
1709 .schedule_task(task1.clone())
1710 .map(|t| t.task_id()),
1711 Some(101)
1712 );
1713 assert_matches!(
1714 state_machine
1715 .schedule_task(task2.clone())
1716 .map(|t| t.task_id()),
1717 Some(102)
1718 );
1719
1720 assert_eq!(state_machine.active_task_count(), 2);
1721 assert_eq!(state_machine.handled_task_count(), 0);
1722 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1723 state_machine.deschedule_task(&task1);
1724 assert_eq!(state_machine.active_task_count(), 1);
1725 assert_eq!(state_machine.handled_task_count(), 1);
1726 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1727 state_machine.deschedule_task(&task2);
1728 assert_eq!(state_machine.active_task_count(), 0);
1729 assert_eq!(state_machine.handled_task_count(), 2);
1730 assert!(state_machine.has_no_active_task());
1731 }
1732
1733 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1734 fn test_all_blocking_readable_tasks_block_writable_task(capability: Capability) {
1735 let conflicting_address = Pubkey::new_unique();
1736 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1737 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1738 let sanitized3 = transaction_with_writable_address(conflicting_address);
1739 let address_loader = &mut create_address_loader(None, &capability);
1740 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1741 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1742 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1743
1744 let mut state_machine = unsafe {
1745 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1746 };
1747 assert_matches!(
1748 state_machine
1749 .schedule_task(task1.clone())
1750 .map(|t| t.task_id()),
1751 Some(101)
1752 );
1753 assert_matches!(
1754 state_machine
1755 .schedule_task(task2.clone())
1756 .map(|t| t.task_id()),
1757 Some(102)
1758 );
1759 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1760
1761 assert_eq!(state_machine.active_task_count(), 3);
1762 assert_eq!(state_machine.handled_task_count(), 0);
1763 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1764 state_machine.deschedule_task(&task1);
1765 assert_eq!(state_machine.active_task_count(), 2);
1766 assert_eq!(state_machine.handled_task_count(), 1);
1767 assert_eq!(state_machine.unblocked_task_queue_count(), 0);
1768 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1769 state_machine.deschedule_task(&task2);
1770 assert_eq!(state_machine.active_task_count(), 1);
1771 assert_eq!(state_machine.handled_task_count(), 2);
1772 assert_eq!(state_machine.unblocked_task_queue_count(), 1);
1773 assert_matches!(
1775 state_machine
1776 .schedule_next_unblocked_task()
1777 .map(|t| t.task_id()),
1778 Some(103)
1779 );
1780 state_machine.deschedule_task(&task3);
1781 assert!(state_machine.has_no_active_task());
1782 }
1783
1784 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1785 fn test_readonly_then_writable_then_readonly_linearized(capability: Capability) {
1786 let conflicting_address = Pubkey::new_unique();
1787 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1788 let sanitized2 = transaction_with_writable_address(conflicting_address);
1789 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1790 let address_loader = &mut create_address_loader(None, &capability);
1791 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1792 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1793 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1794
1795 let mut state_machine = unsafe {
1796 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1797 };
1798 assert_matches!(
1799 state_machine
1800 .schedule_task(task1.clone())
1801 .map(|t| t.task_id()),
1802 Some(101)
1803 );
1804 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1805 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1806
1807 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1808 state_machine.deschedule_task(&task1);
1809 assert_matches!(
1810 state_machine
1811 .schedule_next_unblocked_task()
1812 .map(|t| t.task_id()),
1813 Some(102)
1814 );
1815 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1816 state_machine.deschedule_task(&task2);
1817 assert_matches!(
1818 state_machine
1819 .schedule_next_unblocked_task()
1820 .map(|t| t.task_id()),
1821 Some(103)
1822 );
1823 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1824 state_machine.deschedule_task(&task3);
1825 assert!(state_machine.has_no_active_task());
1826 }
1827
1828 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1829 fn test_readonly_then_writable(capability: Capability) {
1830 let conflicting_address = Pubkey::new_unique();
1831 let sanitized1 = transaction_with_readonly_address(conflicting_address);
1832 let sanitized2 = transaction_with_writable_address(conflicting_address);
1833 let address_loader = &mut create_address_loader(None, &capability);
1834 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1835 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1836
1837 let mut state_machine = unsafe {
1838 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1839 };
1840 assert_matches!(
1841 state_machine
1842 .schedule_task(task1.clone())
1843 .map(|t| t.task_id()),
1844 Some(101)
1845 );
1846 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1847
1848 state_machine.deschedule_task(&task1);
1850 assert_matches!(
1851 state_machine
1852 .schedule_next_unblocked_task()
1853 .map(|t| t.task_id()),
1854 Some(102)
1855 );
1856 state_machine.deschedule_task(&task2);
1857 assert!(state_machine.has_no_active_task());
1858 }
1859
1860 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1861 fn test_blocked_tasks_writable_2_readonly_then_writable(capability: Capability) {
1862 let conflicting_address = Pubkey::new_unique();
1863 let sanitized1 = transaction_with_writable_address(conflicting_address);
1864 let sanitized2 = transaction_with_readonly_address(conflicting_address);
1865 let sanitized3 = transaction_with_readonly_address(conflicting_address);
1866 let sanitized4 = transaction_with_writable_address(conflicting_address);
1867 let address_loader = &mut create_address_loader(None, &capability);
1868 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1869 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1870 let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader);
1871 let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader);
1872
1873 let mut state_machine = unsafe {
1874 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1875 };
1876 assert_matches!(
1877 state_machine
1878 .schedule_task(task1.clone())
1879 .map(|t| t.task_id()),
1880 Some(101)
1881 );
1882 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1883 assert_matches!(state_machine.schedule_task(task3.clone()), None);
1884 assert_matches!(state_machine.schedule_task(task4.clone()), None);
1885
1886 state_machine.deschedule_task(&task1);
1887 assert_matches!(
1888 state_machine
1889 .schedule_next_unblocked_task()
1890 .map(|t| t.task_id()),
1891 Some(102)
1892 );
1893 assert_matches!(
1894 state_machine
1895 .schedule_next_unblocked_task()
1896 .map(|t| t.task_id()),
1897 Some(103)
1898 );
1899 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1902
1903 state_machine.deschedule_task(&task2);
1904 assert_matches!(state_machine.schedule_next_unblocked_task(), None);
1906
1907 state_machine.deschedule_task(&task3);
1908 assert_matches!(
1910 state_machine
1911 .schedule_next_unblocked_task()
1912 .map(|t| t.task_id()),
1913 Some(104)
1914 );
1915 state_machine.deschedule_task(&task4);
1916 assert!(state_machine.has_no_active_task());
1917 }
1918
1919 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1920 fn test_gradual_locking(capability: Capability) {
1921 let conflicting_address = Pubkey::new_unique();
1922 let sanitized1 = transaction_with_writable_address(conflicting_address);
1923 let sanitized2 = transaction_with_writable_address(conflicting_address);
1924 let usage_queues = Rc::new(RefCell::new(HashMap::new()));
1925 let address_loader = &mut create_address_loader(Some(usage_queues.clone()), &capability);
1926 let task1 = SchedulingStateMachine::create_task(sanitized1, 101, address_loader);
1927 let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader);
1928
1929 let mut state_machine = unsafe {
1930 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1931 };
1932 assert_matches!(
1933 state_machine
1934 .schedule_task(task1.clone())
1935 .map(|t| t.task_id()),
1936 Some(101)
1937 );
1938 assert_matches!(state_machine.schedule_task(task2.clone()), None);
1939 let usage_queues = usage_queues.borrow_mut();
1940 let usage_queue = usage_queues.get(&conflicting_address).unwrap();
1941 usage_queue
1942 .0
1943 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1944 assert_matches!(usage_queue.current_usage(), Some(RequestedUsage::Writable));
1945 });
1946 let fee_payer = task2.transaction().message().fee_payer();
1949 let usage_queue = usage_queues.get(fee_payer).unwrap();
1950 usage_queue
1951 .0
1952 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1953 assert_matches!(usage_queue.current_usage(), Some(RequestedUsage::Writable));
1954 });
1955 state_machine.deschedule_task(&task1);
1956 assert_matches!(
1957 state_machine
1958 .schedule_next_unblocked_task()
1959 .map(|t| t.task_id()),
1960 Some(102)
1961 );
1962 state_machine.deschedule_task(&task2);
1963 assert!(state_machine.has_no_active_task());
1964 }
1965
1966 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1967 #[should_panic(expected = "internal error: entered unreachable code")]
1968 fn test_unreachable_unlock_conditions1(capability: Capability) {
1969 let mut state_machine = unsafe {
1970 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1971 };
1972 let usage_queue = UsageQueue::new(&capability);
1973 let sanitized = simplest_transaction();
1974 let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1975 UsageQueue::new(&capability)
1976 });
1977 usage_queue
1978 .0
1979 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
1980 let _ = usage_queue.unlock(task, RequestedUsage::Writable);
1981 });
1982 }
1983
1984 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
1985 #[should_panic(
1986 expected = "assertion failed: `Readonly` does not match `RequestedUsage::Writable`"
1987 )]
1988 fn test_unreachable_unlock_conditions2(capability: Capability) {
1989 let mut state_machine = unsafe {
1990 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
1991 };
1992 let usage_queue = UsageQueue::new(&capability);
1993 let sanitized = simplest_transaction();
1994 let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
1995 UsageQueue::new(&capability)
1996 });
1997 usage_queue
1998 .0
1999 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
2000 usage_queue.update_current_usage(RequestedUsage::Writable, task);
2001 let _ = usage_queue.unlock(task, RequestedUsage::Readonly);
2002 });
2003 }
2004
2005 #[test_matrix([Capability::FifoQueueing, Capability::PriorityQueueing])]
2006 #[should_panic(expected = "internal error: entered unreachable code")]
2007 fn test_unreachable_unlock_conditions3(capability: Capability) {
2008 let mut state_machine = unsafe {
2009 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
2010 };
2011 let usage_queue = UsageQueue::new(&capability);
2012 let sanitized = simplest_transaction();
2013 let task = &SchedulingStateMachine::create_task(sanitized, 3, &mut |_| {
2014 UsageQueue::new(&capability)
2015 });
2016 usage_queue
2017 .0
2018 .with_borrow_mut(&mut state_machine.usage_queue_token, |usage_queue| {
2019 usage_queue.update_current_usage(RequestedUsage::Readonly, task);
2020 let _ = usage_queue.unlock(task, RequestedUsage::Writable);
2021 });
2022 }
2023
2024 mod reblocking {
2025 use super::{RequestedUsage::*, *};
2026
2027 #[track_caller]
2028 fn assert_task_index(actual: Option<Task>, expected: Option<OrderedTaskId>) {
2029 assert_eq!(actual.map(|task| task.task_id()), expected);
2030 }
2031
2032 macro_rules! assert_task_index {
2033 ($left:expr, $right:expr) => {
2034 assert_task_index($left, $right);
2035 };
2036 }
2037
2038 fn setup() -> (
2039 SchedulingStateMachine,
2040 impl FnMut((RequestedUsage, Pubkey), OrderedTaskId) -> Task,
2041 Task,
2042 ) {
2043 let mut state_machine = unsafe {
2044 SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test()
2045 };
2046
2047 let payer = Pubkey::new_unique();
2048 let mut address_loader = create_address_loader(None, &Capability::PriorityQueueing);
2049
2050 let mut create_task = move |(requested_usage, address), task_id| match requested_usage {
2051 RequestedUsage::Readonly => SchedulingStateMachine::create_task(
2052 transaction_with_readonly_address_with_payer(address, &payer),
2053 task_id,
2054 &mut address_loader,
2055 ),
2056 RequestedUsage::Writable => SchedulingStateMachine::create_task(
2057 transaction_with_writable_address_with_payer(address, &payer),
2058 task_id,
2059 &mut address_loader,
2060 ),
2061 };
2062
2063 let t0_block_others = create_task((Writable, Pubkey::new_unique()), 100);
2064 assert_task_index!(
2065 state_machine.schedule_task(t0_block_others.clone()),
2066 Some(100)
2067 );
2068
2069 (state_machine, create_task, t0_block_others)
2070 }
2071
2072 #[test]
2073 fn test_reblocked_tasks_lower_write_then_higher_write() {
2074 let (mut s, mut create_task, t0_block_others) = setup();
2075
2076 let reblocked_address = Pubkey::new_unique();
2077 let t1_reblocked = create_task((Writable, reblocked_address), 102);
2078 let t2_force_locked = create_task((Writable, reblocked_address), 10);
2079
2080 assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2081 assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2082
2083 s.deschedule_task(&t0_block_others);
2084 assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2085 s.deschedule_task(&t2_force_locked);
2086 assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2087 s.deschedule_task(&t1_reblocked);
2088 assert!(s.has_no_active_task());
2089 }
2090
2091 #[test]
2092 fn test_reblocked_tasks_lower_write_then_higher_read() {
2093 let (mut s, mut create_task, t0_block_others) = setup();
2094
2095 let reblocked_address = Pubkey::new_unique();
2096 let t1_reblocked = create_task((Writable, reblocked_address), 102);
2097 let t2_force_locked = create_task((Readonly, reblocked_address), 10);
2098
2099 assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2100 assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2101
2102 s.deschedule_task(&t0_block_others);
2103 assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2104 s.deschedule_task(&t2_force_locked);
2105 assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2106 s.deschedule_task(&t1_reblocked);
2107 assert!(s.has_no_active_task());
2108 }
2109
2110 #[test]
2111 fn test_reblocked_tasks_lower_read_then_higher_read() {
2112 let (mut s, mut create_task, t0_block_others) = setup();
2113
2114 let reblocked_address = Pubkey::new_unique();
2115 let t1_not_reblocked = create_task((Readonly, reblocked_address), 102);
2116 let t2_skipped = create_task((Writable, reblocked_address), 103);
2117 let t3_force_locked = create_task((Readonly, reblocked_address), 10);
2118
2119 assert_task_index!(s.schedule_task(t1_not_reblocked.clone()), None);
2120 assert_task_index!(s.schedule_task(t2_skipped.clone()), None);
2121 assert_task_index!(s.schedule_task(t3_force_locked.clone()), None);
2122
2123 s.deschedule_task(&t0_block_others);
2124 assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2125 s.deschedule_task(&t3_force_locked);
2126 assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2127 s.deschedule_task(&t1_not_reblocked);
2128 assert_task_index!(s.schedule_next_unblocked_task(), Some(103));
2129 s.deschedule_task(&t2_skipped);
2130 assert!(s.has_no_active_task());
2131 }
2132
2133 #[test]
2134 fn test_reblocked_tasks_lower_read_then_higher_write_full() {
2135 let (mut s, mut create_task, t0_block_others) = setup();
2136
2137 let reblocked_address = Pubkey::new_unique();
2138 let t1_reblocked = create_task((Readonly, reblocked_address), 102);
2139 let t2_force_locked = create_task((Writable, reblocked_address), 10);
2140
2141 assert_task_index!(s.schedule_task(t1_reblocked.clone()), None);
2142 assert_task_index!(s.schedule_task(t2_force_locked.clone()), None);
2143
2144 s.deschedule_task(&t0_block_others);
2145 assert_task_index!(s.schedule_next_unblocked_task(), Some(10));
2146 s.deschedule_task(&t2_force_locked);
2147 assert_task_index!(s.schedule_next_unblocked_task(), Some(102));
2148 s.deschedule_task(&t1_reblocked);
2149 assert!(s.has_no_active_task());
2150 }
2151 }
2152}