ubq/lib.rs
1//! A lock-free, unbounded multi-producer/multi-consumer queue backed by a linked
2//! ring of fixed-size blocks.
3//!
4//! # Overview
5//!
6//! [`UBQ<T>`] is a **lock-free MPMC queue** with no upper bound on capacity.
7//! Elements are stored in fixed-size *blocks* that form a circular ring; blocks
8//! are allocated on demand and recycled once fully consumed.
9//!
10//! Every [`UBQ<T>`] handle is cheaply clonable — all clones share the same
11//! underlying ring and may call [`UBQ::push`] and [`UBQ::pop`] concurrently from
12//! any thread. Reference counting ensures that the ring (and any unconsumed
13//! elements) is freed when the last clone is dropped.
14//!
15//! Neither [`UBQ::push`] nor [`UBQ::pop`] ever parks the calling thread. Both
16//! operations are *lock-free*: producers and consumers make progress independently.
17//! The only spin-waits occur at block boundaries, where a consumer briefly waits
18//! for in-flight producers to commit their writes before claiming a slot.
19//!
20//! # Quick start
21//!
22//! ```rust
23//! use ubq::UBQ;
24//! use std::thread;
25//!
26//! let q: UBQ<u64> = UBQ::new();
27//! let q2 = q.clone();
28//!
29//! let producer = thread::spawn(move || {
30//! for i in 0..1_000_u64 {
31//! q2.push(i);
32//! }
33//! });
34//!
35//! producer.join().unwrap();
36//!
37//! for i in 0..1_000_u64 {
38//! assert_eq!(q.pop(), Some(i));
39//! }
40//! assert_eq!(q.pop(), None); // queue is now empty
41//! ```
42//!
43//! # Internal design
44//!
45//! The queue maintains two atomic head pointers — **phead** (producer head) and
46//! **chead** (consumer head) — each pointing into a circular ring of blocks.
47//! Within each block, packed counters track claimed and committed producer/consumer
48//! slots. A consumer spins briefly on the *stability predicate* before claiming a
49//! slot to guarantee it reads only fully-committed writes.
50//!
51//! Detailed correctness arguments for each invariant are annotated inline in the
52//! source as `[C1]`–`[C6]`.
53
54#![warn(missing_docs)]
55
56use crossbeam_utils::{Backoff, CachePadded};
57use std::{
58 cell::UnsafeCell,
59 mem::MaybeUninit,
60 ptr::NonNull,
61 sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering},
62};
63
64/// Atomic storage for a packed pair of counters (`high` = claimed, `low` = committed).
65type A = AtomicU64;
66
67/// Packed `F::BITS`-bit counter representation manipulated through [`A`].
68/// Layout: upper `H::BITS` bits = claimed count, lower `H::BITS` bits = committed count.
69type F = u64;
70/// Single `H::BITS`-bit half of a packed [`F`] counter.
71type H = u32;
72
73/// A lock-free, unbounded multi-producer/multi-consumer (MPMC) queue.
74///
75/// See the [crate-level documentation](crate) for an overview and quick-start example.
76///
77/// # Cloning and ownership
78///
79/// `UBQ<T>` is cheaply clonable: every clone shares the same underlying block ring
80/// via reference counting. All clones may call [`push`](Self::push) and
81/// [`pop`](Self::pop) concurrently without additional coordination. The block ring
82/// (including any unconsumed elements) is freed when the **last** clone is dropped.
83///
84/// # Implementation invariants
85///
86/// 1. **Block availability:** the first published block is immediately open to both
87/// producers and consumers; each later block is initially open only to producers.
88///
89/// 2. **Stale head pointers fail fast:** a producer (resp. consumer) operating on a
90/// block that is no longer `phead` (resp. `chead`) abandons that attempt, reloads
91/// the head pointer, and retries.
92///
93/// 3. **Producer/consumer coordination per block:** `B::p` packs two counters —
94/// `high(p)` = claimed slots (incremented before writing) and
95/// `low(p)` = committed slots (incremented with `Release` after writing).
96/// Before claiming a slot a consumer spin-waits until `high(p) == low(p)`
97/// (all claims committed) or `low(p) == L` (block full). Full blocks use an
98/// unconditional `fetch_add` on `B::c`; partial blocks use a `CAS` loop.
99/// `B::c` starts at `F::MAX` ("closed to consumers") and is set to `0` by the
100/// consumer that advances `chead`.
101pub struct UBQ<T> {
102 /// Atomic pointer to phead: the block currently accepting producer pushes.
103 p: NonNull<CachePadded<AtomicPtr<B<T>>>>,
104 /// Atomic pointer to chead: the block currently being drained by consumers.
105 c: NonNull<CachePadded<AtomicPtr<B<T>>>>,
106 /// Shared reference count. Incremented by [`Clone`] and decremented by [`Drop`];
107 /// when it reaches zero the last clone frees the block ring and drops any
108 /// unconsumed elements.
109 n: NonNull<CachePadded<AtomicUsize>>,
110}
111
112// SAFETY: All shared mutable state is accessed through `AtomicPtr` and [`A`]
113// operations, or through the `UnsafeCell` slots in `B::a`, which are protected by
114// the exclusive-index guarantee [C6] and the Release–Acquire pairing on `B::p` and
115// `B::c`. Given `T: Sync` (resp. `T: Send`), concurrent shared access (resp.
116// cross-thread transfer) of `T` values through `Q` is therefore sound.
117unsafe impl<T: Sync> Sync for UBQ<T> {}
118unsafe impl<T: Send> Send for UBQ<T> {}
119
120/// Number of element slots per block.
121const L: H = 32;
122const _: () = assert!(
123 L != H::MAX,
124 "L cannot be H::MAX, as this value is used as a sentinel."
125);
126
127/// A fixed-size ring-buffer segment.
128///
129/// `p` and `c` each pack two [`H`] counters into a [`A`] using the upper and lower
130/// `H::BITS` bits respectively; see `high()`, `low()`, and `merge()`.
131struct B<T> {
132 /// Pointer to the next block in the ring.
133 n: CachePadded<AtomicPtr<Self>>,
134 /// Producer counter. `high(p)` = slots claimed; `low(p)` = slots committed.
135 p: CachePadded<A>,
136 /// Consumer counter. `high(c)` = slots claimed; `low(c)` = slots committed.
137 /// The sentinel value `F::MAX` means the block is not yet open to consumers.
138 c: CachePadded<A>,
139 /// Element storage. Slot `i` is written by the unique producer that claims index
140 /// `i` and read by the unique consumer that claims index `i`. See [C6].
141 a: [UnsafeCell<MaybeUninit<T>>; L as usize],
142}
143
144/// Returns the lower `H::BITS` bits of a packed counter (committed count).
145fn low(r: F) -> H {
146 r as H
147}
148
149/// Returns the upper `H::BITS` bits of a packed counter (claimed count).
150fn high(r: F) -> H {
151 (r >> H::BITS) as H
152}
153
154/// Packs two [`H`] values into a [`F`]: `h` in the upper `H::BITS` bits, `l` in the lower.
155fn merge(h: H, l: H) -> F {
156 (h as F) << H::BITS | l as F
157}
158
159// ─── Correctness arguments ────────────────────────────────────────────────────
160//
161// The labels [C1]–[C6] below are cited in SAFETY comments and inline remarks
162// throughout `push` and `pop`. They capture the invariants and happens-before
163// edges that make the lock-free protocol sound.
164//
165// [C1] STABILITY PREDICATE
166// v(p) := high(p) == low(p) || low(p) == L
167//
168// high(B::p) counts producers that have claimed a slot (incremented before
169// writing); low(B::p) counts producers that have committed their write
170// (incremented with Release after writing). While !v(p) there are in-flight
171// producers: slots in the range [low(p), high(p)) have been reserved but not
172// yet written. It is not safe to read or reason about any such slot. Once v(p)
173// holds, exactly one of two stable states exists:
174// · high(p) == low(p): every claimed slot has been written, so all slots in
175// [0, low(p)) contain fully initialized values.
176// · low(p) == L: the block is full; by the same argument all L slots have
177// been committed.
178// `backoff.snooze()` explicitly yields the current thread so the OS scheduler
179// has the opportunity to run in-flight producers, resolving the stall quickly.
180//
181// [C2] SLOT VALIDITY
182// If v(p) holds and r_ < low(p), slot r_ contains a fully initialized T.
183//
184// low(B::p) is incremented with Release only after the write to the
185// corresponding slot is complete — the write and the commit are sequentially
186// ordered within each producer (see `push`). A consumer's Acquire load of B::p
187// therefore synchronizes-with each producer's Release commit, establishing
188// happens-before with the write to every slot index less than the observed
189// low(p).
190//
191// [C3] LAST CLAIMER OWNS THE BLOCK TRANSITION
192// Exactly one producer obtains a_ = L-1 from the atomic fetch_add on
193// high(B::p) — specifically, the one whose fetch_add reads back the old value
194// L-1. That producer is the only one to enter the block-transition branch
195// (a_ + 1 == L). At that moment phead still points to the current block p: no
196// other producer may advance phead because advancing phead is the exclusive
197// right of the last claimer, and it has not yet done so. The block transition
198// is therefore race-free with respect to phead and p.n.
199//
200// [C4] BLOCK-TRANSITION VISIBILITY (release sequences)
201// The block-transition stores — p.n.store(b_, Release) or n.c.store(F::MAX,
202// Release) — are sequenced-before the last claimer's own commit:
203// fetch_add(merge(0, 1), Release) on B::p [see push, write + commit block]
204// That commit is a Release RMW on B::p. Every subsequent Release RMW on B::p
205// (i.e., every other producer's commit) extends the release sequence headed by
206// the last claimer's commit. An Acquire load of B::p that reads any value within
207// that release sequence synchronizes-with the last claimer's commit, making all
208// writes sequenced-before it (including the block-transition stores) visible to
209// the loading thread. Concretely: a consumer that Acquire-loads B::p and
210// observes low(p) == L can safely dereference p.n and will observe
211// p.n.c == F::MAX — both guaranteed by this synchronization edge.
212//
213// [C5] NEXT BLOCK IS READY WITHOUT SPINNING
214// When a consumer successfully claims slot r_ and then finds r_ + 1 == L:
215// (i) r_ < low(p) — just passed the None-guard, so some element was
216// available when we checked.
217// (ii) r_ + 1 == L ⟹ r_ = L-1 ⟹ low(p) > L-1.
218// Since low(p) ≤ L is invariant, low(p) == L.
219// By [C4], the Acquire load of B::p (which observed low(p) == L) already
220// established visibility of the block-transition stores made by the last
221// claiming producer: c.n is non-null and c.n.c == F::MAX are both guaranteed
222// to hold on arrival. No spin is needed to wait for either condition; the former
223// spin loop waiting on n.c has therefore been removed.
224//
225// [C6] EXCLUSIVE SLOT ACCESS
226// Each index i in [0, L) in B::a is written by exactly one producer — the one
227// that obtains a_ = i from the atomic fetch_add on high(B::p) — and read by
228// exactly one consumer — the one that claims consumer slot r_ = i via the
229// fetch_add or CAS on high(B::c). The atomic operations on B::p and B::c assign
230// disjoint, non-overlapping indices to each thread, so no two threads ever
231// access the same slot concurrently.
232//
233// ─────────────────────────────────────────────────────────────────────────────
234
235impl<T> UBQ<T> {
236 /// Creates a new, empty queue.
237 ///
238 /// No blocks are allocated until the first call to [`push`](Self::push).
239 #[inline]
240 pub fn new() -> Self {
241 // SAFETY:
242 // · `Box::new_zeroed().assume_init()` is used for the `p` and `c`
243 // allocations. The zero-initialized type is
244 // `CachePadded<AtomicPtr<B<T>>>`, which contains only an `AtomicPtr`.
245 // The zero bit-pattern for `AtomicPtr` is a null pointer — a valid
246 // and expected initial value meaning "no block allocated yet."
247 // · `Box::into_raw` on a non-empty `Box` is always non-null, so
248 // `NonNull::new_unchecked` is sound in all three cases.
249 // · The refcount `n` is initialised to `1` via `Box::new(...)`, which
250 // is unconditionally valid.
251 unsafe {
252 Self {
253 p: NonNull::new_unchecked(Box::into_raw(Box::new_zeroed().assume_init())),
254 c: NonNull::new_unchecked(Box::into_raw(Box::new_zeroed().assume_init())),
255 n: NonNull::new_unchecked(Box::into_raw(Box::new(CachePadded::new(
256 AtomicUsize::new(1),
257 )))),
258 }
259 }
260 }
261
262 /// Pushes `e` onto the back of the queue.
263 ///
264 /// This operation is lock-free and never parks the calling thread. It may
265 /// spin briefly at block boundaries while in-flight producers commit their
266 /// writes.
267 #[doc(alias = "enqueue")]
268 #[doc(alias = "send")]
269 pub fn push(&self, e: T) {
270 let backoff = Backoff::new();
271 // Spare block carried across loop iterations. Avoids a redundant allocation
272 // when we lose a CAS race during first-block initialization.
273 let mut b = None;
274
275 '_0: loop {
276 // Load phead with Acquire, pairing with the Release stores that publish
277 // or advance phead (the CAS below on first push, or the block-transition
278 // stores on subsequent iterations).
279 //
280 // SAFETY: `self.p` is a `NonNull<AtomicPtr<B<T>>>` backed by a heap
281 // allocation created before `Q` is shared and never freed while any clone
282 // of `Q` is alive. `as_ref()` borrows it for the lifetime of `self`.
283 let mut p = unsafe { self.p.as_ref() }.load(Ordering::Acquire);
284
285 if p.is_null() {
286 // No block exists yet. Allocate one and race to install it as phead.
287 //
288 // SAFETY: `Box::new_zeroed()` produces a valid heap allocation.
289 // `assume_init()` is sound because `B<T>` is composed entirely of:
290 // · `AtomicPtr<B<T>>` — bit-pattern zero is a valid null pointer.
291 // · `A` (×2) — bit-pattern zero is a valid counter value.
292 // · `[UnsafeCell<MaybeUninit<T>>; L]` — `MaybeUninit` requires no
293 // initialization; each slot will be written before being read
294 // ([C2], [C6]).
295 let n = Box::into_raw(unsafe { Box::new_zeroed().assume_init() });
296
297 // SAFETY: `self.p` is valid as above.
298 match unsafe { self.p.as_ref() }.compare_exchange_weak(
299 p,
300 n,
301 Ordering::AcqRel,
302 Ordering::Acquire,
303 ) {
304 Ok(_) => {
305 // We won the race. By invariant 1, the first block is
306 // immediately open to both producers and consumers, so we
307 // publish it as chead too.
308 //
309 // SAFETY: `self.c` is a valid `NonNull<AtomicPtr<B<T>>>` for
310 // the same reasons as `self.p`. `n` is a live allocation that
311 // we own exclusively — the CAS succeeded and no other thread
312 // has observed this pointer yet.
313 unsafe { self.c.as_ref().store(n, Ordering::Release) };
314
315 p = n;
316 }
317 Err(r) => {
318 // CAS failed. Recover `n` into a Box to reuse or drop it.
319 //
320 // SAFETY: `n` was produced by `Box::into_raw` moments ago.
321 // Because the CAS failed, no other thread has ever observed
322 // this pointer, so reconstituting ownership is safe.
323 b = Some(unsafe { Box::from_raw(n) });
324
325 // spurious failure
326 if r.is_null() {
327 continue '_0;
328 }
329
330 p = r;
331 }
332 }
333 }
334
335 // Non-binding hint: peek at high(p) before paying for an atomic RMW. If
336 // the block already appears full we skip the fetch_add entirely. This load
337 // may be stale; the authoritative check is the `a_ >= L` guard below.
338 //
339 // SAFETY: `p` is a valid non-null pointer to a live `B<T>`. It was either
340 // just allocated (null-branch above) or loaded from phead with Acquire,
341 // which synchronizes-with the Release that published it. Relaxed ordering
342 // is sufficient because this is a non-binding advisory check only.
343 let a = unsafe { high((*p).p.load(Ordering::Relaxed)) };
344
345 if a >= L {
346 backoff.snooze();
347 continue '_0;
348 }
349
350 // Atomically claim a slot by incrementing high(B::p). The old value
351 // returned is our exclusive slot index. Relaxed ordering is sufficient:
352 // correctness relies on the Release commit (the low increment) that
353 // follows, not on the claim itself.
354 //
355 // SAFETY: `p` is valid as established above.
356 let a_ = unsafe { high((*p).p.fetch_add(merge(1, 0), Ordering::Relaxed)) };
357
358 // The hint may have been stale, or another producer claimed the last slot
359 // between the hint and our fetch_add. In either case we over-claimed.
360 // By [C3], only the producer with a_ == L-1 may run the block transition;
361 // all others that over-claim simply backoff and retry.
362 if a_ >= L {
363 backoff.snooze();
364 continue '_0;
365 }
366
367 // [C3]: We are the last claimer (a_ == L-1). Run the block transition
368 // BEFORE writing to slot a_, so that new producers can begin filling the
369 // next block without waiting for our write to commit. The Release stores
370 // below establish the happens-before edge described in [C4].
371 if a_ + 1 == L {
372 // Load p.n to determine whether a suitable successor block already
373 // exists. Acquire pairs with the Release that stored p.n in a prior
374 // block transition (or the initial null pointer).
375 //
376 // SAFETY: `p` is valid as above.
377 let n = unsafe { (*p).n.load(Ordering::Acquire) };
378
379 if n.is_null()
380 || unsafe {
381 // Check whether the existing successor block `n` has been fully
382 // consumed (low(n.c) >= L means all L consumer commits have
383 // occurred) and may therefore be recycled. Acquire pairs with the
384 // Release consumer commits (fetch_add on B::c) in `pop`.
385 //
386 // SAFETY: `n` is non-null — the `||` short-circuits when n is
387 // null, so we only reach this sub-expression when n != null. As a
388 // block in the ring loaded from p.n with Acquire, `n` is a valid
389 // live allocation for the lifetime of `Q`.
390 low((*n).c.load(Ordering::Acquire)) < L
391 }
392 {
393 // `n` is either absent or not yet fully consumed. Allocate a new
394 // block (or reuse the spare). Zero-initialization is valid for the
395 // same reasons as the first-block allocation above.
396 //
397 // SAFETY: same as the first-block allocation above.
398 let mut b = b
399 .take()
400 .unwrap_or_else(|| unsafe { Box::new_zeroed().assume_init() });
401
402 // Link the new block into the ring:
403 // · If no successor exists (n is null), b_.n = p, forming a
404 // two-block cycle.
405 // · If a successor exists but is not yet fully consumed,
406 // b_.n = n, inserting b_ before n.
407 // b_.c = F::MAX marks it as open to producers (via phead) but
408 // not yet to consumers (sentinel; invariant 1, [C5]).
409 *b.n.get_mut() = if n.is_null() { p } else { n };
410 *b.c.get_mut() = F::MAX;
411
412 let b_ = Box::into_raw(b);
413
414 // Advance phead to b_, then link p.n to b_. Both stores are
415 // Release for [C4]: a consumer Acquire-loading B::p and observing
416 // low == L synchronizes-with these stores, guaranteeing that
417 // p.n == b_ and b_.c == F::MAX are visible.
418 //
419 // SAFETY: `b_` is a freshly allocated, exclusively owned block.
420 // `self.p` and `p` are valid as established above. By [C3] we are
421 // the sole thread modifying phead and p.n at this moment.
422 unsafe {
423 self.p.as_ref().store(b_, Ordering::Release);
424 (*p).n.store(b_, Ordering::Release);
425 }
426 } else {
427 // `n` is fully consumed (low(n.c) >= L). Recycle it in place.
428 //
429 // The order of the three stores below is significant:
430 // 1. n.p = 0 (Release): reset the producer counter so that new
431 // producers observing phead == n start claiming from slot 0.
432 // This must precede the phead advance so producers see a
433 // clean counter.
434 // 2. phead = n (Release): new producers may now begin pushing
435 // to n.
436 // 3. n.c = F::MAX (Release): signal to the consumer (the one
437 // that will claim slot L-1 of the current block p and then
438 // advance chead) that n is ready for the consumer reset. This
439 // is stored AFTER the phead advance so that n.p is already
440 // zeroed before any consumer opens n for reading.
441 //
442 // All three stores are Release for [C4]: a consumer Acquire-loading
443 // B::p and observing low == L synchronizes-with these stores,
444 // making n.p == 0 and n.c == F::MAX visible on arrival.
445 //
446 // SAFETY: `n` is a valid live block confirmed fully consumed.
447 // `self.p` is valid as above. By [C3] we are the sole modifier of
448 // n.p, phead, and n.c at this moment.
449 unsafe {
450 (*n).p.store(0, Ordering::Release);
451 self.p.as_ref().store(n, Ordering::Release);
452
453 // `n` ready for consumer reset
454 (*n).c.store(F::MAX, Ordering::Release);
455 }
456 }
457 }
458
459 // Write the element to our exclusively owned slot, then commit by
460 // incrementing low(B::p) with Release.
461 //
462 // SAFETY:
463 // · `p` is valid as established above.
464 // · `a_` is in [0, L): the `a_ >= L` guard above enforces this.
465 // · By [C6], slot a_ is exclusively owned by this producer — the
466 // fetch_add assigned us a unique index that no other thread will
467 // write to or read from before our commit is visible.
468 // · Writing through `UnsafeCell::get()` is sound given exclusive
469 // access. `MaybeUninit::new(e)` fully initializes the slot.
470 // · The Release ordering on the commit (fetch_add on B::p) ensures
471 // the write to a[a_] happens-before any consumer that Acquire-loads
472 // B::p and observes our commit, satisfying [C2] and [C4].
473 unsafe {
474 (*p).a
475 .get_unchecked(a_ as usize)
476 .get()
477 .write(MaybeUninit::new(e));
478 (*p).p.fetch_add(merge(0, 1), Ordering::Release);
479 }
480
481 return;
482 }
483 }
484
485 /// Removes and returns the front element, or [`None`] if no committed element
486 /// is currently available.
487 ///
488 /// Returns [`None`] when the queue is empty **or** when the current block has
489 /// no further committed slots at this instant (in-flight producers may still
490 /// be writing). After all producers have finished, `None` reliably indicates
491 /// an empty queue.
492 ///
493 /// This operation is lock-free and never parks the calling thread.
494 #[doc(alias = "dequeue")]
495 #[doc(alias = "recv")]
496 pub fn pop(&self) -> Option<T> {
497 let backoff = Backoff::new();
498
499 '_0: loop {
500 // Load chead with Acquire, pairing with the Release stores that publish
501 // or advance chead (the initial store in `push` for the first block, or
502 // the chead-advance store in `pop` for subsequent blocks).
503 //
504 // SAFETY: `self.c` is a `NonNull<AtomicPtr<B<T>>>` backed by a heap
505 // allocation that lives for the duration of `Q`, for the same reasons as
506 // `self.p` in `push`.
507 let c = unsafe { self.c.as_ref().load(Ordering::Acquire).as_ref()? };
508
509 // Load B::c (the consumer counter) for the current block. Acquire pairs
510 // with the Release store that last modified B::c: either the chead-advance
511 // store that zeroed it (opening this block for consumers), or a consumer
512 // commit below.
513 let mut r = c.c.load(Ordering::Acquire);
514
515 '_1: loop {
516 // r_ = high(B::c) = number of consumer slots already claimed in this block.
517 let mut r_ = high(r);
518
519 if r_ >= L {
520 // All L consumer slots have already been claimed by other threads.
521 // The consumer that claimed r_ = L-1 will have advanced (or is about
522 // to advance) chead to the next block. Backoff and reload chead.
523 backoff.snooze();
524 continue '_0;
525 }
526
527 // Load B::p to evaluate the stability predicate [C1]. Acquire ordering
528 // establishes happens-before with all Release commits made by producers
529 // in `push`, satisfying [C2] once v(p) holds.
530 let mut p = c.p.load(Ordering::Acquire);
531
532 // [C1] STABILITY PREDICATE: v(p) := high(p) == low(p) || low(p) == L.
533 // Until v(p) holds there are in-flight producers: slots in the range
534 // [low(p), high(p)) are reserved but not yet written. We must not claim
535 // any slot until the block reaches a stable state. `backoff.snooze()`
536 // yields the thread to give those producers time to commit.
537 let v = |p: F| high(p) == low(p) || low(p) == L;
538
539 if !v(p) {
540 // Spin until stable. Each Acquire reload re-establishes the
541 // happens-before relationship required for [C2].
542 '_2: loop {
543 p = c.p.load(Ordering::Acquire);
544
545 if v(p) {
546 break;
547 }
548
549 backoff.snooze();
550 }
551 }
552
553 // v(p) now holds. By [C2], every slot in [0, low(p)) contains a fully
554 // initialized value. If r_ >= low(p), all committed slots have already
555 // been claimed by other consumers — there is nothing left for us in this
556 // block. Return None to signal that this block is drained.
557 if r_ >= low(p) {
558 return None;
559 }
560
561 // Claim a consumer slot. Two paths depending on whether the block is
562 // full or partial:
563 if low(p) == L {
564 // Full block: unconditional fetch_add. We verified r_ < low(p) = L
565 // above, so at least one slot was available when we checked. Relaxed
566 // ordering is sufficient because the necessary happens-before was
567 // already established by the Acquire load of B::p above ([C2]).
568 let r = high(c.c.fetch_add(merge(1, 0), Ordering::Relaxed));
569
570 if r >= L {
571 // Another consumer raced ahead and filled all L consumer slots
572 // between our r_ < low(p) check and the fetch_add. Backoff and
573 // reload chead; the chead advance from the winner will have
574 // occurred (or is imminent).
575 backoff.snooze();
576 continue '_0;
577 }
578
579 r_ = r;
580 } else {
581 // Partial block: CAS to prevent over-claiming past low(p). We
582 // increment high(B::c) only if it matches the value we loaded in `r`.
583 // On failure, reload `r` with the fresh value and retry the inner
584 // loop. Relaxed ordering is sufficient (same reasoning as the
585 // full-block path).
586 match c.c.compare_exchange_weak(
587 r,
588 r + merge(1, 0),
589 Ordering::Relaxed,
590 Ordering::Relaxed,
591 ) {
592 Ok(_) => {}
593 Err(rc) => {
594 backoff.spin();
595
596 r = rc;
597 continue '_1;
598 }
599 }
600 }
601
602 // We have exclusively claimed slot r_. Check whether we are the last
603 // consumer claimer for this block (r_ == L-1), which makes us
604 // responsible for advancing chead.
605 //
606 // [C5]: r_ < low(p) (passed the None-guard) and r_ + 1 == L together
607 // imply low(p) == L (since low(p) ≤ L is invariant). By [C4], the
608 // Acquire load of B::p that observed low(p) == L already synchronizes-
609 // with the last-claiming producer's Release commit, making the
610 // block-transition stores visible: c.n is non-null and
611 // c.n.c == F::MAX are both guaranteed on arrival. No spin is needed.
612 if r_ + 1 == L {
613 // Load c.n. Acquire pairs with the Release stores in `push` that
614 // wrote p.n (both the allocation and recycle paths). By [C5], the
615 // returned pointer is guaranteed non-null.
616 let n = c.n.load(Ordering::Acquire);
617
618 // Open the next block for consumers by resetting its consumer
619 // counter to 0, then advance chead.
620 //
621 // SAFETY:
622 // · `n` is non-null by [C5].
623 // · `n` is a valid live block in the ring (loaded from c.n with
624 // Acquire; all ring blocks are live for the lifetime of `Q`).
625 // · We are the exclusive opener: we are the unique claimer of
626 // consumer slot r_ = L-1 (uniqueness enforced by the atomic
627 // fetch_add / CAS above), and n.c == F::MAX (by [C5]) confirms
628 // no other consumer has already opened this block.
629 // · Relaxed on the n.c store is sufficient because the Release
630 // store of chead immediately after establishes visibility: any
631 // consumer that Acquire-loads chead and reaches block n will
632 // observe n.c == 0.
633 unsafe {
634 (*n).c.store(0, Ordering::Relaxed);
635 self.c.as_ref().store(n, Ordering::Release);
636 }
637 }
638
639 // Read the element from our exclusively owned slot, then commit by
640 // incrementing low(B::c) with Release. The Release on the commit makes
641 // our read of this slot visible to producers checking low(B::c) for
642 // the recycle condition in `push`.
643 //
644 // SAFETY:
645 // · `r_` is in [0, L): enforced by the r_ >= L guard above and by
646 // the fetch_add / CAS which bound the claimed index to < L.
647 // · By [C6], slot r_ is exclusively owned by this consumer — the
648 // fetch_add / CAS assigned us a unique index that no other thread
649 // may read from concurrently.
650 // · By [C2], slot r_ contains a fully initialized T: the Acquire
651 // load of B::p established happens-before with the producer's
652 // Release commit for that slot, which was itself sequenced-after
653 // the write to a[r_]. `assume_init()` is therefore sound.
654 let e = unsafe { (*c).a.get_unchecked(r_ as usize).get().read().assume_init() };
655 c.c.fetch_add(merge(0, 1), Ordering::Release);
656
657 return Some(e);
658 }
659 }
660 }
661}
662
663impl<T> Clone for UBQ<T> {
664 fn clone(&self) -> Self {
665 // Increment the shared reference count before constructing the new handle.
666 // Relaxed ordering is sufficient: the increment transfers no data —
667 // producers and consumers synchronize memory through B::p and B::c in
668 // `push`/`pop`. We only need to atomically prevent the count from
669 // underflowing while this clone is alive.
670 //
671 // SAFETY: `self.n` is a `NonNull<CachePadded<AtomicUsize>>` backed by
672 // the heap allocation created in `new()`. Because `self` is a live clone
673 // the reference count is ≥ 1, so the allocation has not been freed.
674 // `as_ref()` borrows it for the lifetime of `self`, which is valid.
675 unsafe { self.n.as_ref().fetch_add(1, Ordering::Relaxed) };
676
677 // `NonNull<_>` is `Copy`, so each field clone is a bitwise copy of the
678 // raw pointer. All clones share the same phead, chead, and refcount
679 // allocations; no data is deep-copied.
680 Self {
681 p: self.p.clone(),
682 c: self.c.clone(),
683 n: self.n.clone(),
684 }
685 }
686}
687
688impl<T> Drop for UBQ<T> {
689 fn drop(&mut self) {
690 // Atomically decrement the reference count. Relaxed ordering is
691 // sufficient: the caller is responsible for ensuring all push/pop
692 // operations are complete before the last clone is destroyed (typically
693 // by joining producer/consumer threads). Under that contract, ownership
694 // of any remaining elements is transferred via thread-join
695 // synchronization rather than through this atomic.
696 //
697 // SAFETY: `self.n` is a valid `NonNull<CachePadded<AtomicUsize>>` for
698 // the same reasons as in `Clone::clone`.
699 let n = unsafe { self.n.as_ref().fetch_sub(1, Ordering::Relaxed) };
700
701 if n == 1 {
702 // We were the last clone (count dropped from 1 to 0). No other
703 // thread holds a `UBQ` handle, so we have exclusive access to all
704 // three shared allocations and to every block in the ring.
705 //
706 // Capture chead before freeing self.c's allocation. `get_mut()`
707 // bypasses atomic operations; sound here because exclusive access
708 // is guaranteed (no other clone exists, no concurrent push/pop).
709 //
710 // SAFETY: `self.c` is a valid `NonNull<CachePadded<AtomicPtr<B<T>>>>`.
711 // `as_mut()` requires exclusive access, which holds because `n == 1`
712 // before the fetch_sub guarantees we are the sole owner.
713 let mut b = unsafe { *self.c.as_mut().get_mut() };
714
715 // Reclaim the three meta-allocations (refcount, phead, chead boxes).
716 // Each pointer was produced by `Box::into_raw` in `new()` and has
717 // not been freed — the previous reference count of 1 ensures this
718 // is the first and only reclamation.
719 //
720 // SAFETY: All three raw pointers are valid, non-null, and
721 // exclusively owned at this point.
722 unsafe {
723 drop(Box::from_raw(self.n.as_ptr()));
724 drop(Box::from_raw(self.p.as_ptr()));
725 drop(Box::from_raw(self.c.as_ptr()));
726 }
727
728 // If no block was ever allocated (queue was created but never
729 // pushed to), chead is null and there is nothing to traverse.
730 if b.is_null() {
731 return;
732 }
733
734 // Traverse the circular block ring starting from chead and drop
735 // every unconsumed element. We stop when we return to the starting
736 // block (b == b_) or hit a null next pointer (ring partially
737 // formed on early teardown).
738 let b_ = b;
739
740 loop {
741 // Read the next pointer and both counters directly, bypassing
742 // atomic operations — valid because we have exclusive access.
743 //
744 // SAFETY: `b` is non-null (checked above; each subsequent
745 // value is a valid ring block's `n` pointer) and points to a
746 // live `B<T>` allocation.
747 let n = unsafe { *(*b).n.get_mut() };
748
749 let p = unsafe { *(*b).p.get_mut() };
750 let c = unsafe { *(*b).c.get_mut() };
751
752 debug_assert!(
753 high(p) == low(p) || low(p) == L,
754 "all producers should be finished before dropping UBQ (p = {}:{})",
755 high(p),
756 low(p)
757 );
758 debug_assert!(
759 high(c) == low(c) || low(c) == L,
760 "all consumers should be finished before dropping UBQ (c = {}:{})",
761 high(c),
762 low(c),
763 );
764
765 // Drop unconsumed elements in this block. The live range of
766 // initialized, unconsumed slots is:
767 // · [0, low(p)) when c == F::MAX — block was never
768 // opened to consumers; every produced value is unconsumed.
769 // · [low(c), low(p)) otherwise — low(c) committed consumer
770 // reads have already moved [0, low(c)) out; the remaining
771 // range [low(c), low(p)) holds initialized unconsumed Ts.
772 //
773 // By [C2], every slot in [0, low(p)) contains a fully
774 // initialized T. The debug_asserts above confirm no in-flight
775 // producers remain, so low(p) accurately reflects all slots
776 // written in this block.
777 for i in if c == F::MAX { 0 } else { low(c) }..low(p) {
778 // SAFETY:
779 // · `b` is a valid `*mut B<T>` as established above.
780 // · `i` is in [0, low(p)) ⊆ [0, L); the offset is in
781 // bounds of the `a` array.
782 // · Slot `i` holds an initialized T (by [C2] and the
783 // range argument above).
784 // · Exclusive access is guaranteed (last-clone
785 // exclusivity); no concurrent reads or writes occur.
786 // · `cast::<T>()` is valid: `UnsafeCell<MaybeUninit<T>>`
787 // is `repr(transparent)` over `MaybeUninit<T>`, which
788 // is `repr(transparent)` over `T`; layout matches.
789 unsafe {
790 (*b).a
791 .as_mut_ptr_range()
792 .start
793 .add(i as usize)
794 .cast::<T>()
795 .drop_in_place()
796 }
797 }
798
799 b = n;
800
801 if b.is_null() || b == b_ {
802 break;
803 }
804 }
805 }
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use std::{
812 fmt::{Debug, Write},
813 sync::{
814 Arc,
815 atomic::{AtomicBool, AtomicUsize, Ordering},
816 },
817 thread::{self, JoinHandle},
818 usize,
819 };
820
821 use super::*;
822
823 impl<T> Debug for UBQ<T> {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 let p = unsafe { *self.p.as_ref().as_ptr() };
826
827 if p.is_null() {
828 return writeln!(f, "UBQ {{}}");
829 }
830
831 let mut s = String::new();
832 let mut c = p;
833
834 let fmt = |u: F| -> String {
835 if u == F::MAX {
836 format!("full:full")
837 } else {
838 format!("{:04}:{:04}", high(u), low(u))
839 }
840 };
841
842 loop {
843 let p_ = unsafe { *(*c).p.as_ptr() };
844 let c_ = unsafe { *(*c).c.as_ptr() };
845
846 write!(s, "\t{c:p}: p={}, c={}", fmt(p_), fmt(c_))?;
847
848 c = unsafe { *(*c).n.as_ptr() };
849 if c == p {
850 break;
851 }
852
853 write!(s, "\n")?;
854 }
855
856 write!(f, "UBQ {{\n{s}\t}}")
857 }
858 }
859
860 struct DropProbe {
861 dropped: Arc<AtomicUsize>,
862 }
863
864 impl DropProbe {
865 fn new(dropped: Arc<AtomicUsize>) -> Self {
866 Self { dropped }
867 }
868 }
869
870 impl Drop for DropProbe {
871 fn drop(&mut self) {
872 self.dropped.fetch_add(1, Ordering::SeqCst);
873 }
874 }
875
876 #[test]
877 fn drop_releases_all_enqueued_values() {
878 let token = Arc::new(());
879 let n = (L as usize * 3) + 7;
880
881 for _ in 0..16 {
882 let q = UBQ::new();
883
884 for _ in 0..n {
885 q.push(token.clone());
886 }
887
888 assert_eq!(Arc::strong_count(&token), n + 1);
889
890 println!("q: {q:?}");
891
892 drop(q);
893
894 assert_eq!(Arc::strong_count(&token), 1);
895 }
896 }
897
898 #[test]
899 fn drop_of_final_clone_drops_items_left_in_queue() {
900 let dropped = Arc::new(AtomicUsize::new(0));
901
902 let q = UBQ::new();
903 let q1 = q.clone();
904 let q2 = q.clone();
905
906 let total = (L as usize * 2) + 5;
907 let popped = (L as usize / 2) + 1;
908
909 for _ in 0..(L as usize + 1) {
910 q.push(DropProbe::new(dropped.clone()));
911 }
912 for _ in 0..(total - (L as usize + 1)) {
913 q1.push(DropProbe::new(dropped.clone()));
914 }
915
916 let mut held = Vec::with_capacity(popped);
917 for _ in 0..popped {
918 held.push(
919 q2.pop()
920 .expect("queue should contain enough elements for this test"),
921 );
922 }
923
924 drop(q);
925 drop(q1);
926
927 assert_eq!(dropped.load(Ordering::SeqCst), 0);
928
929 let remaining = total - popped;
930 drop(q2);
931
932 assert_eq!(dropped.load(Ordering::SeqCst), remaining);
933
934 drop(held);
935
936 assert_eq!(dropped.load(Ordering::SeqCst), total);
937 }
938
939 #[test]
940 fn fill_drain_ordered() {
941 let q = UBQ::new();
942
943 let m = 1_000_000;
944 for i in 0..m {
945 q.push(i);
946 }
947
948 for i in 0..m {
949 assert_eq!(q.pop(), Some(i));
950 }
951 }
952
953 #[test]
954 fn mpmc_4p4c() {
955 let q = UBQ::new();
956
957 let flag = Arc::new(AtomicBool::new(true));
958
959 let pf = |q: UBQ<usize>, m: usize| -> JoinHandle<()> {
960 thread::spawn(move || {
961 for i in 0..m {
962 q.push(i);
963 }
964 })
965 };
966
967 let cf = |q: UBQ<usize>, m: usize| -> JoinHandle<()> {
968 let flag = flag.clone();
969
970 thread::spawn(move || {
971 for _ in 0..m {
972 loop {
973 if flag.load(Ordering::Acquire) {
974 if q.pop().is_some() {
975 break;
976 }
977 } else {
978 assert!(q.pop().is_some());
979 break;
980 }
981 }
982 }
983 })
984 };
985
986 let m = 1_000_001;
987 let v: Vec<_> = (0..4)
988 .map(|_| (pf(q.clone(), m), cf(q.clone(), m)))
989 .collect();
990
991 let v: Vec<_> = v
992 .into_iter()
993 .map(|(p, c)| {
994 p.join().unwrap();
995 c
996 })
997 .collect();
998
999 flag.store(false, Ordering::Release);
1000
1001 for c in v {
1002 c.join().unwrap()
1003 }
1004 }
1005}