1#![warn(missing_debug_implementations, rust_2018_idioms, missing_docs)]
72
73use std::cmp;
74use std::collections::hash_map::DefaultHasher;
75use std::fmt::{self, Debug, Formatter};
76use std::hash::Hasher;
77use std::iter::FusedIterator;
78use std::mem::{self, MaybeUninit};
79use std::ops::Deref;
80use std::ptr::{self, NonNull};
81#[cfg(not(loom))]
82use std::{collections::hash_map::RandomState, hash::BuildHasher};
83
84#[cfg_attr(loom, path = "loom.rs")]
85#[cfg_attr(not(loom), path = "std.rs")]
86mod facade;
87use facade::atomic::{self, AtomicBool, AtomicU16, AtomicU32, AtomicUsize};
88use facade::Arc;
89use facade::GlobalQueue;
90use facade::UnsafeCell;
91
92#[derive(Debug)]
97pub struct Queue<T>(Arc<Shared<T>>);
98
99impl<T> Queue<T> {
100 pub fn new(local_queues: usize, local_queue_size: u16) -> Self {
121 assert_eq!(
122 local_queue_size.count_ones(),
123 1,
124 "Queue size is not a power of two"
125 );
126 let mask = local_queue_size - 1;
127
128 Self(Arc::new(Shared {
129 local_queues: (0..local_queues)
130 .map(|_| LocalQueueInner {
131 heads: AtomicU32::new(0),
132 tail: AtomicU16::new(0),
133 mask,
134 items: (0..local_queue_size)
135 .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
136 .collect(),
137 })
138 .collect(),
139 global_queue: GlobalQueue::new(),
140 stealing_global: AtomicBool::new(false),
141 taken_local_queues: AtomicBool::new(false),
142 searchers: AtomicUsize::new(0),
143 }))
144 }
145
146 pub fn push(&self, item: T) {
149 let _ = self.0.global_queue.push(item);
150 }
151
152 pub fn local_queues(&self) -> LocalQueues<'_, T> {
158 assert!(!self
159 .0
160 .taken_local_queues
161 .swap(true, atomic::Ordering::Relaxed));
162
163 LocalQueues {
164 shared: self,
165 index: 0,
166 #[cfg(not(loom))]
167 hasher: RandomState::new().build_hasher(),
168 #[cfg(loom)]
169 hasher: DefaultHasher::new(),
170 }
171 }
172
173 #[must_use]
178 pub fn searchers(&self) -> usize {
179 self.0.searchers.load(atomic::Ordering::Relaxed)
180 }
181}
182
183impl<T> Clone for Queue<T> {
184 fn clone(&self) -> Self {
185 Self(Arc::clone(&self.0))
186 }
187}
188
189#[derive(Debug)]
190struct Shared<T> {
191 local_queues: Box<[LocalQueueInner<T>]>,
192 global_queue: GlobalQueue<T>,
193 stealing_global: AtomicBool,
196 taken_local_queues: AtomicBool,
199 searchers: AtomicUsize,
201}
202
203struct LocalQueueInner<T> {
205 heads: AtomicU32,
212
213 tail: AtomicU16,
215
216 mask: u16,
219
220 items: Box<[UnsafeCell<MaybeUninit<T>>]>,
222}
223
224unsafe impl<T: Send> Sync for LocalQueueInner<T> {}
225
226impl<T> Debug for LocalQueueInner<T> {
227 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
228 let (protected_head, head) = unpack_heads(self.heads.load(atomic::Ordering::Acquire));
229
230 f.debug_struct("LocalQueueInner")
231 .field("protected_head", &protected_head)
232 .field("head", &head)
233 .field("tail", &self.tail)
234 .field("mask", &format_args!("{:#b}", self.mask))
235 .finish()
236 }
237}
238
239fn unpack_heads(heads: u32) -> (u16, u16) {
242 ((heads >> 16) as u16, heads as u16)
243}
244fn pack_heads(stealer: u16, real: u16) -> u32 {
246 (stealer as u32) << 16 | real as u32
247}
248
249#[derive(Debug)]
253pub struct LocalQueue<T> {
254 lifo_slot: Option<T>,
257 local: ValidPtr<LocalQueueInner<T>>,
258 shared: Queue<T>,
259 rng: Rng,
261}
262
263impl<T> LocalQueue<T> {
264 fn local_tail(&mut self) -> u16 {
266 unsafe { facade::atomic_u16_unsync_load(&self.local.tail) }
269 }
270
271 pub fn push(&mut self, item: T) {
274 if let Some(previous) = self.lifo_slot.replace(item) {
275 self.push_yield(previous);
276 }
277 }
278
279 pub fn push_yield(&mut self, item: T) {
283 let tail = self.local_tail();
284
285 let mut heads = self.local.heads.load(atomic::Ordering::Acquire);
288
289 loop {
290 let (steal_head, head) = unpack_heads(heads);
291
292 if tail.wrapping_sub(steal_head) < self.local.items.len() as u16 {
294 let i = tail & self.local.mask;
295
296 self.local.items[usize::from(i)]
297 .with_mut(|slot| unsafe { slot.write(MaybeUninit::new(item)) });
298
299 self.local
302 .tail
303 .store(tail.wrapping_add(1), atomic::Ordering::Release);
304
305 return;
306 }
307
308 if steal_head == head {
315 let half = self.local.items.len() as u16 / 2;
316 let res = self.local.heads.compare_exchange(
319 heads,
320 pack_heads(head.wrapping_add(half), head.wrapping_add(half)),
321 atomic::Ordering::AcqRel,
327 atomic::Ordering::Acquire,
330 );
331
332 if let Err(new_heads) = res {
335 heads = new_heads;
336 continue;
337 }
338
339 for i in 0..half {
341 let index = head.wrapping_add(i) & self.local.mask;
342 let item = unsafe {
343 self.local.items[usize::from(index)]
344 .with(|slot| slot.read())
345 .assume_init()
346 };
347 let _ = self.shared.0.global_queue.push(item);
348 }
349 }
350
351 let _ = self.shared.0.global_queue.push(item);
352
353 return;
354 }
355 }
356
357 pub fn pop(&mut self) -> Option<T> {
360 if let Some(item) = self.lifo_slot.take() {
362 return Some(item);
363 }
364
365 let tail = self.local_tail();
366
367 let res = atomic_u32_fetch_update(
369 &self.local.heads,
370 atomic::Ordering::Relaxed,
373 atomic::Ordering::Relaxed,
374 |heads| {
375 let (steal_head, head) = unpack_heads(heads);
376 if head == tail {
377 None
378 } else if steal_head == head {
379 Some(pack_heads(head.wrapping_add(1), head.wrapping_add(1)))
381 } else {
382 Some(pack_heads(steal_head, head.wrapping_add(1)))
385 }
386 },
387 );
388
389 let heads = match res {
390 Ok(heads) => {
392 let (_, head) = unpack_heads(heads);
393 let i = head & self.local.mask;
394 return Some(unsafe {
395 self.local.items[usize::from(i)]
396 .with(|ptr| ptr.read())
397 .assume_init()
398 });
399 }
400 Err(heads) => heads,
402 };
403 let (steal_head, head) = unpack_heads(heads);
404 assert_eq!(head, tail);
405
406 let space = self.local.items.len() as u16 - head.wrapping_sub(steal_head);
408
409 self.shared
413 .0
414 .searchers
415 .fetch_add(1, atomic::Ordering::Relaxed);
417
418 struct DecrementSearchers<'a>(&'a AtomicUsize);
419 impl Drop for DecrementSearchers<'_> {
420 fn drop(&mut self) {
421 self.0.fetch_sub(1, atomic::Ordering::Relaxed);
422 }
423 }
424 let _decrement_searchers = DecrementSearchers(&self.shared.0.searchers);
425
426 if self
428 .shared
429 .0
430 .stealing_global
431 .compare_exchange(
432 false,
433 true,
434 atomic::Ordering::Relaxed,
436 atomic::Ordering::Relaxed,
437 )
438 .is_ok()
439 {
440 let popped_item = self.shared.0.global_queue.pop();
441
442 if popped_item.is_some() {
443 let steal = cmp::min(self.local.items.len() as u16 / 2, space);
447 let mut tail = head;
448 let end_tail = head.wrapping_add(steal);
449
450 u32_acquire_fence(&self.local.heads);
453
454 while tail != end_tail {
455 match self.shared.0.global_queue.pop() {
456 Some(item) => {
457 let i = tail & self.local.mask;
458 self.local.items[usize::from(i)]
459 .with_mut(|slot| unsafe { slot.write(MaybeUninit::new(item)) });
460 }
461 None => break,
462 }
463 tail = tail.wrapping_add(1);
464 }
465 self.local.tail.store(tail, atomic::Ordering::Release);
468 }
469
470 self.shared
471 .0
472 .stealing_global
473 .store(false, atomic::Ordering::Relaxed);
474
475 if let Some(popped_item) = popped_item {
476 return Some(popped_item);
477 }
478 }
479
480 let queues = self.shared.0.local_queues.len();
482 let start = self.rng.gen_usize_to(queues);
483
484 'sibling_queues: for i in 0..queues {
485 let mut i = start + i;
486 if i >= queues {
487 i -= queues;
488 }
489
490 let queue = &self.shared.0.local_queues[i];
491 if ptr::eq(queue, &*self.local) {
492 continue;
493 }
494
495 let mut queue_heads = queue.heads.load(atomic::Ordering::Acquire);
498
499 let (old_queue_head, mut queue_head, steal) = loop {
500 let (queue_steal_head, queue_head) = unpack_heads(queue_heads);
501
502 if queue_steal_head != queue_head {
504 continue 'sibling_queues;
505 }
506
507 let queue_tail = queue.tail.load(atomic::Ordering::Acquire);
510
511 let stealable = queue_tail.wrapping_sub(queue_head);
513
514 let steal = cmp::min(stealable - stealable / 2, space);
517
518 if steal == 0 {
519 continue 'sibling_queues;
520 }
521
522 let new_queue_head = queue_head.wrapping_add(steal);
523
524 let res = queue.heads.compare_exchange_weak(
527 queue_heads,
528 pack_heads(queue_head, new_queue_head),
531 atomic::Ordering::Acquire,
533 atomic::Ordering::Acquire,
536 );
537
538 match res {
539 Ok(_) => break (queue_head, new_queue_head, steal),
540 Err(updated_queue_heads) => queue_heads = updated_queue_heads,
541 }
542 };
543
544 assert_ne!(steal, 0);
545
546 u32_acquire_fence(&self.local.heads);
549
550 let first_item = unsafe {
552 queue.items[usize::from(old_queue_head & queue.mask)]
553 .with(|slot| slot.read())
554 .assume_init()
555 };
556
557 for i in 1..steal {
559 let src = &queue.items[usize::from(old_queue_head.wrapping_add(i) & queue.mask)];
560 let dst =
561 &self.local.items[usize::from(head.wrapping_add(i - 1) & self.local.mask)];
562
563 src.with(|src| dst.with_mut(|dst| unsafe { src.copy_to_nonoverlapping(dst, 1) }))
564 }
565
566 loop {
568 let res = queue.heads.compare_exchange_weak(
569 pack_heads(old_queue_head, queue_head),
570 pack_heads(queue_head, queue_head),
571 atomic::Ordering::Release,
574 atomic::Ordering::Relaxed,
577 );
578
579 match res {
580 Ok(_) => break,
581 Err(updated_queue_heads) => {
582 let (updated_queue_steal_head, update_queue_head) =
583 unpack_heads(updated_queue_heads);
584 assert_eq!(updated_queue_steal_head, old_queue_head);
585 queue_head = update_queue_head;
586 }
587 }
588 }
589
590 if steal > 1 {
591 self.local
594 .tail
595 .store(tail.wrapping_add(steal - 1), atomic::Ordering::Release);
596 }
597
598 return Some(first_item);
599 }
600
601 self.shared.0.global_queue.pop()
604 }
605
606 #[must_use]
611 pub fn searchers(&self) -> usize {
612 self.shared.searchers()
613 }
614
615 #[must_use]
617 pub fn global(&self) -> &Queue<T> {
618 &self.shared
619 }
620}
621
622#[derive(Debug)]
624#[must_use = "iterators are lazy and do nothing unless consumed"]
625pub struct LocalQueues<'a, T> {
626 shared: &'a Queue<T>,
627 index: usize,
628 hasher: DefaultHasher,
629}
630
631impl<T> Iterator for LocalQueues<'_, T> {
632 type Item = LocalQueue<T>;
633
634 fn next(&mut self) -> Option<Self::Item> {
635 let inner = self.shared.0.local_queues.get(self.index)?;
636 self.index += 1;
637
638 Some(LocalQueue {
639 lifo_slot: None,
640 local: unsafe { ValidPtr::new(inner) },
643 shared: self.shared.clone(),
644 rng: Rng {
645 state: {
646 self.hasher.write_usize(self.index);
647 self.hasher.finish()
648 },
649 },
650 })
651 }
652 fn size_hint(&self) -> (usize, Option<usize>) {
653 let len = self.len();
654 (len, Some(len))
655 }
656}
657
658impl<T> ExactSizeIterator for LocalQueues<'_, T> {
659 fn len(&self) -> usize {
660 self.shared.0.local_queues.len() - self.index
661 }
662}
663
664impl<T> FusedIterator for LocalQueues<'_, T> {}
665
666struct ValidPtr<T: ?Sized>(NonNull<T>);
668impl<T: ?Sized> ValidPtr<T> {
669 unsafe fn new(ptr: *const T) -> Self {
670 Self(NonNull::new_unchecked(ptr as *mut T))
671 }
672}
673impl<T: ?Sized> Clone for ValidPtr<T> {
674 fn clone(&self) -> Self {
675 *self
676 }
677}
678impl<T: ?Sized> Copy for ValidPtr<T> {}
679impl<T: ?Sized> Deref for ValidPtr<T> {
680 type Target = T;
681 fn deref(&self) -> &Self::Target {
682 unsafe { self.0.as_ref() }
683 }
684}
685impl<T: ?Sized + Debug> Debug for ValidPtr<T> {
686 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
687 T::fmt(self, f)
688 }
689}
690unsafe impl<T: ?Sized + Sync> Send for ValidPtr<T> {}
691unsafe impl<T: ?Sized + Sync> Sync for ValidPtr<T> {}
692
693#[cfg(target_pointer_width = "64")]
694type DoubleUsize = u128;
695#[cfg(target_pointer_width = "32")]
696type DoubleUsize = u64;
697
698#[derive(Debug)]
700struct Rng {
701 state: u64,
702}
703impl Rng {
704 fn gen_u64(&mut self) -> u64 {
705 self.state = self.state.wrapping_add(0xA0761D6478BD642F);
706 let t = u128::from(self.state) * u128::from(self.state ^ 0xE7037ED1A0B428DB);
707 (t >> 64) as u64 ^ t as u64
708 }
709 fn gen_usize(&mut self) -> usize {
710 self.gen_u64() as usize
711 }
712 fn gen_usize_to(&mut self, to: usize) -> usize {
713 const USIZE_BITS: usize = mem::size_of::<usize>() * 8;
715
716 let mut x = self.gen_usize();
717 let mut m = ((x as DoubleUsize * to as DoubleUsize) >> USIZE_BITS) as usize;
718 let mut l = x.wrapping_mul(to);
719 if l < to {
720 let t = to.wrapping_neg() % to;
721 while l < t {
722 x = self.gen_usize();
723 m = ((x as DoubleUsize * to as DoubleUsize) >> USIZE_BITS) as usize;
724 l = x.wrapping_mul(to);
725 }
726 }
727 m
728 }
729}
730
731fn atomic_u32_fetch_update<F>(
732 atomic: &AtomicU32,
733 set_order: atomic::Ordering,
734 fetch_order: atomic::Ordering,
735 mut f: F,
736) -> Result<u32, u32>
737where
738 F: FnMut(u32) -> Option<u32>,
739{
740 let mut prev = atomic.load(fetch_order);
741 while let Some(next) = f(prev) {
742 match atomic.compare_exchange_weak(prev, next, set_order, fetch_order) {
743 Ok(x) => return Ok(x),
744 Err(next_prev) => prev = next_prev,
745 }
746 }
747 Err(prev)
748}
749
750fn u32_acquire_fence(atomic: &AtomicU32) {
751 if cfg!(tsan) {
752 atomic.load(atomic::Ordering::Acquire);
754 } else {
755 atomic::fence(atomic::Ordering::Acquire);
756 }
757}
758
759#[cfg(all(test, not(loom)))]
760mod tests {
761 use super::*;
762
763 use std::collections::HashSet;
764
765 #[test]
766 fn rng() {
767 let mut rng = Rng { state: 3493858 };
768
769 let mut remaining: HashSet<_> = (0..15).collect();
770
771 while !remaining.is_empty() {
772 let value = rng.gen_usize_to(15);
773 assert!(value < 15, "{} is not less than 15!", value);
774 remaining.remove(&value);
775 }
776 }
777
778 #[test]
779 fn lifo_slot() {
780 let queue = Queue::new(1, 2);
781 let mut local = queue.local_queues().next().unwrap();
782
783 assert_eq!(local.pop(), None);
784 assert_eq!(local.pop(), None);
785
786 local.push(Box::new(5));
787 assert_eq!(local.pop(), Some(Box::new(5)));
788 assert_eq!(local.pop(), None);
789 }
790
791 #[test]
792 fn push_many() {
793 let queue = Queue::new(1, 2);
794 let mut local = queue.local_queues().next().unwrap();
795
796 for i in 0..4 {
797 local.push(Box::new(i));
798 }
799 assert_eq!(local.pop(), Some(Box::new(3)));
800 assert_eq!(local.pop(), Some(Box::new(1)));
801 assert_eq!(local.pop(), Some(Box::new(0)));
802 assert_eq!(local.pop(), Some(Box::new(2)));
803 assert_eq!(local.pop(), None);
804 }
805
806 #[test]
807 fn wrapping() {
808 let queue = Queue::new(1, 2);
809 let mut local = queue.local_queues().next().unwrap();
810
811 local.push_yield(Box::new(0));
812
813 for i in 0..10 {
814 local.push_yield(Box::new(i + 1));
815
816 assert_eq!(local.pop(), Some(Box::new(i)));
817 }
818
819 assert_eq!(local.pop(), Some(Box::new(10)));
820 assert_eq!(local.pop(), None);
821 assert_eq!(local.pop(), None);
822 }
823
824 #[test]
825 fn steal_global() {
826 for &size in &[2, 4, 8, 16, 32, 64] {
827 let queue = Queue::new(4, size);
828
829 for i in 0..16 {
830 queue.push(Box::new(i));
831 }
832
833 let mut local = queue.local_queues().next().unwrap();
834
835 for i in 0..16 {
836 assert_eq!(local.pop(), Some(Box::new(i)));
837 }
838
839 assert_eq!(local.pop(), None);
840 }
841 }
842
843 #[test]
844 fn steal_siblings() {
845 let queue = Queue::new(2, 64);
846
847 let mut locals: Vec<_> = queue.local_queues().collect();
848
849 locals[0].push_yield(Box::new(4));
850 locals[0].push_yield(Box::new(5));
851
852 locals[1].push(Box::new(1));
853 locals[1].push(Box::new(0));
854
855 queue.push(Box::new(2));
856 queue.push(Box::new(3));
857
858 for i in 0..6 {
859 assert_eq!(locals[1].pop(), Some(Box::new(i)));
860 }
861 }
862
863 #[test]
864 fn many_locals() {
865 let queue = <Queue<()>>::new(10, 128);
866 queue.local_queues().for_each(drop);
867 }
868
869 #[test]
870 fn searchers() {
871 let queue = Queue::new(2, 64);
872 let mut locals = queue.local_queues();
873 let mut local_a = locals.next().unwrap();
874 let mut local_b = locals.next().unwrap();
875
876 assert_eq!(local_a.searchers(), 0);
877 assert_eq!(local_b.searchers(), 0);
878
879 local_a.push(());
880 local_a.push(());
881 local_a.pop().unwrap();
882 local_a.pop().unwrap();
883 queue.push(());
884 local_b.pop().unwrap();
885 assert!(local_b.pop().is_none());
886
887 assert_eq!(local_a.searchers(), 0);
888 assert_eq!(local_b.searchers(), 0);
889
890 if cfg!(not(miri)) {
892 let stop = Arc::new(AtomicBool::new(false));
893
894 let handle = std::thread::spawn({
895 let stop = Arc::clone(&stop);
896 move || {
897 while !stop.load(atomic::Ordering::Relaxed) {
898 local_b.pop();
899 }
900 }
901 });
902
903 loop {
904 let searchers = local_a.searchers();
905 assert!(searchers < 2);
906 if searchers == 1 {
907 break;
908 }
909 }
910
911 stop.store(true, atomic::Ordering::Relaxed);
912 handle.join().unwrap();
913 }
914 }
915
916 #[test]
917 fn stress() {
918 let queue = Queue::new(4, 4);
919
920 if cfg!(miri) {
921 for _ in 0..3 {
922 queue.push(4);
923 }
924 } else {
925 for _ in 0..32 {
926 queue.push(6);
927 }
928 }
929
930 let threads: Vec<_> = queue
931 .local_queues()
932 .map(|mut queue| {
933 std::thread::spawn(move || {
934 while let Some(num) = queue.pop() {
935 for _ in 0..num {
936 queue.push(num - 1);
937 }
938 }
939 })
940 })
941 .collect();
942
943 for thread in threads {
944 thread.join().unwrap();
945 }
946 }
947
948 #[test]
949 fn cobb() {
950 use std::cell::UnsafeCell;
951
952 struct State(Option<Box<[UnsafeCell<LocalQueue<Box<i32>>>]>>);
953 unsafe impl Sync for State {}
954
955 cobb::run_test(cobb::TestCfg {
956 threads: 4,
957 iterations: if cfg!(miri) { 100 } else { 1000 },
958 sub_iterations: if cfg!(miri) { 1 } else { 10 },
959 setup: || {
960 let queue = Queue::new(4, 4);
961 State(Some(
962 queue
963 .local_queues()
964 .map(UnsafeCell::new)
965 .collect::<Box<[_]>>(),
966 ))
967 },
968 test: |State(state), tctx| {
969 let local_queues = state.as_ref().unwrap();
970 let queue = unsafe { &mut *local_queues[tctx.thread_index()].get() };
971 if tctx.thread_index() < 2 {
972 queue.push(Box::new(5));
973 } else {
974 queue.pop();
975 }
976 },
977 teardown: |state| *state = State(None),
978 ..Default::default()
979 });
980 }
981}
982
983#[cfg(all(test, loom))]
984mod loom_tests {
985 use super::*;
986
987 fn locals<T, const N: usize>(queue: &Queue<T>) -> [LocalQueue<T>; N] {
988 array_init::from_iter(queue.local_queues()).expect("incorrect number of local queues")
989 }
990
991 #[test]
992 fn pop_none() {
993 loom::model(|| {
994 let queue: Queue<()> = Queue::new(2, 1);
995 let [mut local_1, mut local_2] = locals(&queue);
996 loom::thread::spawn(move || assert!(local_1.pop().is_none()));
997 assert!(local_2.pop().is_none());
998 });
999 }
1000
1001 #[test]
1002 fn concurrent_steal_global() {
1003 loom::model(|| {
1004 let queue: Queue<Box<i32>> = Queue::new(2, 1);
1005 let [mut local_1, mut local_2] = locals(&queue);
1006 for i in 0..2 {
1007 queue.push(Box::new(i));
1008 }
1009 loom::thread::spawn(move || {
1010 local_1.pop();
1011 local_1.pop();
1012 });
1013 local_2.pop();
1014 });
1015 }
1016
1017 #[test]
1018 fn concurrent_steal_sibling() {
1019 loom::model(|| {
1020 let queue: Queue<Box<i32>> = Queue::new(3, 1);
1021 let [mut local_1, mut local_2, mut local_3] = locals(&queue);
1022 for i in 0..4 {
1023 local_1.push(Box::new(i));
1024 }
1025 loom::thread::spawn(move || {
1026 local_2.pop();
1027 local_2.pop();
1028 });
1029 local_3.pop();
1030 });
1031 }
1032
1033 #[test]
1034 fn global_spsc() {
1035 loom::model(|| {
1036 let queue: Queue<Box<i32>> = Queue::new(1, 4);
1037 let [mut local] = locals(&queue);
1038 loom::thread::spawn(move || {
1039 for i in 0..6 {
1040 queue.push(Box::new(i));
1041 }
1042 });
1043 for _ in 0..6 {
1044 local.pop();
1045 }
1046 });
1047 }
1048
1049 #[test]
1050 fn sibling_spsc_few() {
1051 loom::model(|| {
1052 let queue: Queue<Box<i32>> = Queue::new(2, 4);
1053 let [mut local_1, mut local_2] = locals(&queue);
1054 loom::thread::spawn(move || {
1055 for i in 0..4 {
1056 local_1.push(Box::new(i));
1057 }
1058 });
1059 for _ in 0..4 {
1060 local_2.pop();
1061 }
1062 });
1063 }
1064
1065 #[test]
1066 fn sibling_spsc_many() {
1067 loom::model(|| {
1068 let queue: Queue<Box<i32>> = Queue::new(2, 4);
1069 let [mut local_1, mut local_2] = locals(&queue);
1070 loom::thread::spawn(move || {
1071 for i in 0..8 {
1072 local_1.push(Box::new(i));
1073 }
1074 });
1075 local_2.pop();
1076 });
1077 }
1078}