use std::{
fmt::Debug,
hint::black_box,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread::{self},
time::Instant,
usize,
};
use crate::{BLOCK_LENGTH, ConfiguredUBQ, UBQ, align, backoff, ubq};
#[test]
fn drop_releases_all_enqueued_values() {
let token = Arc::new(());
let n = (BLOCK_LENGTH as usize * 3) + 7;
for _ in 0..16 {
let q = UBQ::new();
for _ in 0..n {
q.push(token.clone());
}
assert_eq!(Arc::strong_count(&token), n + 1);
println!("q: {q:?}");
drop(q);
assert_eq!(Arc::strong_count(&token), 1);
}
}
#[test]
fn fill_drain_ordered() {
let q = UBQ::new();
let m = 1_000_000;
for i in 0..m {
q.push(i);
}
for i in 0..m {
assert_eq!(q.pop(), Some(i));
}
}
#[test]
fn refill_drain_recycled_blocks() {
let q = UBQ::new();
let per_round = BLOCK_LENGTH * 3 + 17;
for round in 0..64 {
for i in 0..per_round {
q.push((round, i));
}
for i in 0..per_round {
assert_eq!(q.pop(), Some((round, i)));
}
assert_eq!(q.pop(), None);
}
}
#[test]
fn mpmc() {
let q = UBQ::new_arc();
let flag = Arc::new(AtomicBool::new(true));
let epoch = Instant::now();
let m = 1_000_001;
let v: Vec<_> = (0..8)
.map(|_| {
(
{
let q = q.clone();
thread::spawn(move || {
for i in 0..m {
q.push(black_box((i % u8::MAX as i32) as u8));
}
})
},
{
let flag = flag.clone();
let q = q.clone();
thread::spawn(move || {
for _ in 0..m {
loop {
if flag.load(Ordering::Acquire) {
if black_box(q.pop()).is_some() {
break;
}
} else {
assert!(black_box(q.pop()).is_some());
break;
}
}
}
})
},
)
})
.collect();
let v: Vec<_> = v
.into_iter()
.map(|(p, c)| {
p.join().unwrap();
c
})
.collect();
flag.store(false, Ordering::Release);
for c in v {
c.join().unwrap()
}
println!("{:?}", epoch.elapsed());
}
#[test]
fn configured_queue_supports_non_default_pool_and_preset_block() {
let q = ConfiguredUBQ::<u64, backoff::Crossbeam, 8, 127, align::A256>::new();
for i in 0..10_000 {
q.push(i);
}
for i in 0..10_000 {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), None);
}
#[test]
fn configured_queue_supports_arbitrary_block_with_explicit_alignment() {
#[repr(align(1024))]
#[derive(Clone, Copy, Debug, Default)]
struct A1024;
let q = ConfiguredUBQ::<u64, backoff::Crossbeam, 2, 100, A1024>::new();
for i in 0..2_000 {
q.push(i);
}
for i in 0..2_000 {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), None);
}
#[test]
fn ubq_macro_defaults_to_public_alias() {
let q: ConfiguredUBQ<u64> = ubq!(type: u64);
q.push(9);
assert_eq!(q.pop(), Some(9));
}
#[test]
fn ubq_macro_applies_explicit_overrides() {
let q: ConfiguredUBQ<u64, backoff::Yield, 2, 127, align::A256> = ubq!(
type: u64,
backoff: backoff::Yield,
pool: 2,
block: 127,
);
q.push(11);
assert_eq!(q.pop(), Some(11));
}
#[test]
fn ubq_macro_supports_custom_alignment_override() {
#[repr(align(1024))]
#[derive(Clone, Copy, Debug, Default)]
struct A1024;
let q: ConfiguredUBQ<u64, backoff::Crossbeam, 4, 100, A1024> = ubq!(
type: u64,
pool: 4,
block: 100,
align: A1024,
);
q.push(13);
assert_eq!(q.pop(), Some(13));
}
#[test]
fn push_test() {
let q = UBQ::new_arc();
let epoch = Instant::now();
let v = (0..8)
.map(|_| {
let q = q.clone();
thread::spawn(move || {
for i in 0..1_000_000 {
q.push(black_box(i));
}
})
})
.collect::<Vec<_>>();
v.into_iter().for_each(|h| h.join().unwrap());
println!("{:?}", epoch.elapsed());
}