Skip to main content

ubq/
lib.rs

1//! A lock-free, unbounded multi-producer/multi-consumer queue backed by a linked
2//! ring of fixed-size blocks.
3//!
4//! # Overview
5//!
6//! [`UBQ<T>`] is a **lock-free MPMC queue** with no upper bound on capacity.
7//! Elements are stored in fixed-size *blocks* that form a circular ring; blocks
8//! are allocated on demand and recycled once fully consumed.
9//!
10//! Every [`UBQ<T>`] handle is cheaply clonable — all clones share the same
11//! underlying ring and may call [`UBQ::push`] and [`UBQ::pop`] concurrently from
12//! any thread.  Reference counting ensures that the ring (and any unconsumed
13//! elements) is freed when the last clone is dropped.
14//!
15//! Neither [`UBQ::push`] nor [`UBQ::pop`] ever parks the calling thread.  Both
16//! operations are *lock-free*: producers and consumers make progress independently.
17//! The only spin-waits occur at block boundaries, where a consumer briefly waits
18//! for in-flight producers to commit their writes before claiming a slot.
19//!
20//! # Quick start
21//!
22//! ```rust
23//! use ubq::UBQ;
24//! use std::thread;
25//!
26//! let q: UBQ<u64> = UBQ::new();
27//! let q2 = q.clone();
28//!
29//! let producer = thread::spawn(move || {
30//!     for i in 0..1_000_u64 {
31//!         q2.push(i);
32//!     }
33//! });
34//!
35//! producer.join().unwrap();
36//!
37//! for i in 0..1_000_u64 {
38//!     assert_eq!(q.pop(), Some(i));
39//! }
40//! assert_eq!(q.pop(), None); // queue is now empty
41//! ```
42//!
43//! # Internal design
44//!
45//! The queue maintains two atomic head pointers — **phead** (producer head) and
46//! **chead** (consumer head) — each pointing into a circular ring of blocks.
47//! Within each block, packed counters track claimed and committed producer/consumer
48//! slots.  A consumer spins briefly on the *stability predicate* before claiming a
49//! slot to guarantee it reads only fully-committed writes.
50//!
51//! Detailed correctness arguments for each invariant are annotated inline in the
52//! source as `[C1]`–`[C6]`.
53
54#![warn(missing_docs)]
55
56use crossbeam_utils::{Backoff, CachePadded};
57use std::{
58    cell::UnsafeCell,
59    mem::MaybeUninit,
60    ptr::NonNull,
61    sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering},
62};
63
64/// Atomic storage for a packed pair of counters (`high` = claimed, `low` = committed).
65type A = AtomicU64;
66
67/// Packed `F::BITS`-bit counter representation manipulated through [`A`].
68/// Layout: upper `H::BITS` bits = claimed count, lower `H::BITS` bits = committed count.
69type F = u64;
70/// Single `H::BITS`-bit half of a packed [`F`] counter.
71type H = u32;
72
73/// A lock-free, unbounded multi-producer/multi-consumer (MPMC) queue.
74///
75/// See the [crate-level documentation](crate) for an overview and quick-start example.
76///
77/// # Cloning and ownership
78///
79/// `UBQ<T>` is cheaply clonable: every clone shares the same underlying block ring
80/// via reference counting.  All clones may call [`push`](Self::push) and
81/// [`pop`](Self::pop) concurrently without additional coordination.  The block ring
82/// (including any unconsumed elements) is freed when the **last** clone is dropped.
83///
84/// # Implementation invariants
85///
86/// 1. **Block availability:** the first published block is immediately open to both
87///    producers and consumers; each later block is initially open only to producers.
88///
89/// 2. **Stale head pointers fail fast:** a producer (resp. consumer) operating on a
90///    block that is no longer `phead` (resp. `chead`) abandons that attempt, reloads
91///    the head pointer, and retries.
92///
93/// 3. **Producer/consumer coordination per block:** `B::p` packs two counters —
94///    `high(p)` = claimed slots (incremented before writing) and
95///    `low(p)` = committed slots (incremented with `Release` after writing).
96///    Before claiming a slot a consumer spin-waits until `high(p) == low(p)`
97///    (all claims committed) or `low(p) == L` (block full).  Full blocks use an
98///    unconditional `fetch_add` on `B::c`; partial blocks use a `CAS` loop.
99///    `B::c` starts at `F::MAX` ("closed to consumers") and is set to `0` by the
100///    consumer that advances `chead`.
101pub struct UBQ<T> {
102    /// Atomic pointer to phead: the block currently accepting producer pushes.
103    p: NonNull<CachePadded<AtomicPtr<B<T>>>>,
104    /// Atomic pointer to chead: the block currently being drained by consumers.
105    c: NonNull<CachePadded<AtomicPtr<B<T>>>>,
106    /// Shared reference count. Incremented by [`Clone`] and decremented by [`Drop`];
107    /// when it reaches zero the last clone frees the block ring and drops any
108    /// unconsumed elements.
109    n: NonNull<CachePadded<AtomicUsize>>,
110}
111
112// SAFETY: All shared mutable state is accessed through `AtomicPtr` and [`A`]
113// operations, or through the `UnsafeCell` slots in `B::a`, which are protected by
114// the exclusive-index guarantee [C6] and the Release–Acquire pairing on `B::p` and
115// `B::c`. Given `T: Sync` (resp. `T: Send`), concurrent shared access (resp.
116// cross-thread transfer) of `T` values through `Q` is therefore sound.
117unsafe impl<T: Sync> Sync for UBQ<T> {}
118unsafe impl<T: Send> Send for UBQ<T> {}
119
120/// Number of element slots per block.
121const L: H = 32;
122const _: () = assert!(
123    L != H::MAX,
124    "L cannot be H::MAX, as this value is used as a sentinel."
125);
126
127/// A fixed-size ring-buffer segment.
128///
129/// `p` and `c` each pack two [`H`] counters into a [`A`] using the upper and lower
130/// `H::BITS` bits respectively; see `high()`, `low()`, and `merge()`.
131struct B<T> {
132    /// Pointer to the next block in the ring.
133    n: CachePadded<AtomicPtr<Self>>,
134    /// Producer counter. `high(p)` = slots claimed; `low(p)` = slots committed.
135    p: CachePadded<A>,
136    /// Consumer counter. `high(c)` = slots claimed; `low(c)` = slots committed.
137    /// The sentinel value `F::MAX` means the block is not yet open to consumers.
138    c: CachePadded<A>,
139    /// Element storage. Slot `i` is written by the unique producer that claims index
140    /// `i` and read by the unique consumer that claims index `i`. See [C6].
141    a: [UnsafeCell<MaybeUninit<T>>; L as usize],
142}
143
144/// Returns the lower `H::BITS` bits of a packed counter (committed count).
145fn low(r: F) -> H {
146    r as H
147}
148
149/// Returns the upper `H::BITS` bits of a packed counter (claimed count).
150fn high(r: F) -> H {
151    (r >> H::BITS) as H
152}
153
154/// Packs two [`H`] values into a [`F`]: `h` in the upper `H::BITS` bits, `l` in the lower.
155fn merge(h: H, l: H) -> F {
156    (h as F) << H::BITS | l as F
157}
158
159// ─── Correctness arguments ────────────────────────────────────────────────────
160//
161// The labels [C1]–[C6] below are cited in SAFETY comments and inline remarks
162// throughout `push` and `pop`. They capture the invariants and happens-before
163// edges that make the lock-free protocol sound.
164//
165// [C1] STABILITY PREDICATE
166//   v(p) := high(p) == low(p)  ||  low(p) == L
167//
168//   high(B::p) counts producers that have claimed a slot (incremented before
169//   writing); low(B::p) counts producers that have committed their write
170//   (incremented with Release after writing). While !v(p) there are in-flight
171//   producers: slots in the range [low(p), high(p)) have been reserved but not
172//   yet written. It is not safe to read or reason about any such slot. Once v(p)
173//   holds, exactly one of two stable states exists:
174//     · high(p) == low(p): every claimed slot has been written, so all slots in
175//       [0, low(p)) contain fully initialized values.
176//     · low(p) == L: the block is full; by the same argument all L slots have
177//       been committed.
178//   `backoff.snooze()` explicitly yields the current thread so the OS scheduler
179//   has the opportunity to run in-flight producers, resolving the stall quickly.
180//
181// [C2] SLOT VALIDITY
182//   If v(p) holds and r_ < low(p), slot r_ contains a fully initialized T.
183//
184//   low(B::p) is incremented with Release only after the write to the
185//   corresponding slot is complete — the write and the commit are sequentially
186//   ordered within each producer (see `push`). A consumer's Acquire load of B::p
187//   therefore synchronizes-with each producer's Release commit, establishing
188//   happens-before with the write to every slot index less than the observed
189//   low(p).
190//
191// [C3] LAST CLAIMER OWNS THE BLOCK TRANSITION
192//   Exactly one producer obtains a_ = L-1 from the atomic fetch_add on
193//   high(B::p) — specifically, the one whose fetch_add reads back the old value
194//   L-1. That producer is the only one to enter the block-transition branch
195//   (a_ + 1 == L). At that moment phead still points to the current block p: no
196//   other producer may advance phead because advancing phead is the exclusive
197//   right of the last claimer, and it has not yet done so. The block transition
198//   is therefore race-free with respect to phead and p.n.
199//
200// [C4] BLOCK-TRANSITION VISIBILITY (release sequences)
201//   The block-transition stores — p.n.store(b_, Release) or n.c.store(F::MAX,
202//   Release) — are sequenced-before the last claimer's own commit:
203//       fetch_add(merge(0, 1), Release) on B::p  [see push, write + commit block]
204//   That commit is a Release RMW on B::p. Every subsequent Release RMW on B::p
205//   (i.e., every other producer's commit) extends the release sequence headed by
206//   the last claimer's commit. An Acquire load of B::p that reads any value within
207//   that release sequence synchronizes-with the last claimer's commit, making all
208//   writes sequenced-before it (including the block-transition stores) visible to
209//   the loading thread. Concretely: a consumer that Acquire-loads B::p and
210//   observes low(p) == L can safely dereference p.n and will observe
211//   p.n.c == F::MAX — both guaranteed by this synchronization edge.
212//
213// [C5] NEXT BLOCK IS READY WITHOUT SPINNING
214//   When a consumer successfully claims slot r_ and then finds r_ + 1 == L:
215//     (i)  r_ < low(p) — just passed the None-guard, so some element was
216//          available when we checked.
217//     (ii) r_ + 1 == L  ⟹  r_ = L-1  ⟹  low(p) > L-1.
218//          Since low(p) ≤ L is invariant,  low(p) == L.
219//   By [C4], the Acquire load of B::p (which observed low(p) == L) already
220//   established visibility of the block-transition stores made by the last
221//   claiming producer: c.n is non-null and c.n.c == F::MAX are both guaranteed
222//   to hold on arrival. No spin is needed to wait for either condition; the former
223//   spin loop waiting on n.c has therefore been removed.
224//
225// [C6] EXCLUSIVE SLOT ACCESS
226//   Each index i in [0, L) in B::a is written by exactly one producer — the one
227//   that obtains a_ = i from the atomic fetch_add on high(B::p) — and read by
228//   exactly one consumer — the one that claims consumer slot r_ = i via the
229//   fetch_add or CAS on high(B::c). The atomic operations on B::p and B::c assign
230//   disjoint, non-overlapping indices to each thread, so no two threads ever
231//   access the same slot concurrently.
232//
233// ─────────────────────────────────────────────────────────────────────────────
234
235impl<T> UBQ<T> {
236    /// Creates a new, empty queue.
237    ///
238    /// No blocks are allocated until the first call to [`push`](Self::push).
239	#[inline]
240    pub fn new() -> Self {
241        // SAFETY:
242        //   · `Box::new_zeroed().assume_init()` is used for the `p` and `c`
243        //     allocations. The zero-initialized type is
244        //     `CachePadded<AtomicPtr<B<T>>>`, which contains only an `AtomicPtr`.
245        //     The zero bit-pattern for `AtomicPtr` is a null pointer — a valid
246        //     and expected initial value meaning "no block allocated yet."
247        //   · `Box::into_raw` on a non-empty `Box` is always non-null, so
248        //     `NonNull::new_unchecked` is sound in all three cases.
249        //   · The refcount `n` is initialised to `1` via `Box::new(...)`, which
250        //     is unconditionally valid.
251        unsafe {
252            Self {
253                p: NonNull::new_unchecked(Box::into_raw(Box::new_zeroed().assume_init())),
254                c: NonNull::new_unchecked(Box::into_raw(Box::new_zeroed().assume_init())),
255                n: NonNull::new_unchecked(Box::into_raw(Box::new(CachePadded::new(
256                    AtomicUsize::new(1),
257                )))),
258            }
259        }
260    }
261
262    /// Pushes `e` onto the back of the queue.
263    ///
264    /// This operation is lock-free and never parks the calling thread.  It may
265    /// spin briefly at block boundaries while in-flight producers commit their
266    /// writes.
267    #[doc(alias = "enqueue")]
268    #[doc(alias = "send")]
269    pub fn push(&self, e: T) {
270        let backoff = Backoff::new();
271        // Spare block carried across loop iterations. Avoids a redundant allocation
272        // when we lose a CAS race during first-block initialization.
273        let mut b = None;
274
275        '_0: loop {
276            // Load phead with Acquire, pairing with the Release stores that publish
277            // or advance phead (the CAS below on first push, or the block-transition
278            // stores on subsequent iterations).
279            //
280            // SAFETY: `self.p` is a `NonNull<AtomicPtr<B<T>>>` backed by a heap
281            // allocation created before `Q` is shared and never freed while any clone
282            // of `Q` is alive. `as_ref()` borrows it for the lifetime of `self`.
283            let mut p = unsafe { self.p.as_ref() }.load(Ordering::Acquire);
284
285            if p.is_null() {
286                // No block exists yet. Allocate one and race to install it as phead.
287                //
288                // SAFETY: `Box::new_zeroed()` produces a valid heap allocation.
289                // `assume_init()` is sound because `B<T>` is composed entirely of:
290                //   · `AtomicPtr<B<T>>` — bit-pattern zero is a valid null pointer.
291                //   · `A` (×2)  — bit-pattern zero is a valid counter value.
292                //   · `[UnsafeCell<MaybeUninit<T>>; L]` — `MaybeUninit` requires no
293                //     initialization; each slot will be written before being read
294                //     ([C2], [C6]).
295                let n = Box::into_raw(unsafe { Box::new_zeroed().assume_init() });
296
297                // SAFETY: `self.p` is valid as above.
298                match unsafe { self.p.as_ref() }.compare_exchange_weak(
299                    p,
300                    n,
301                    Ordering::AcqRel,
302                    Ordering::Acquire,
303                ) {
304                    Ok(_) => {
305                        // We won the race. By invariant 1, the first block is
306                        // immediately open to both producers and consumers, so we
307                        // publish it as chead too.
308                        //
309                        // SAFETY: `self.c` is a valid `NonNull<AtomicPtr<B<T>>>` for
310                        // the same reasons as `self.p`. `n` is a live allocation that
311                        // we own exclusively — the CAS succeeded and no other thread
312                        // has observed this pointer yet.
313                        unsafe { self.c.as_ref().store(n, Ordering::Release) };
314
315                        p = n;
316                    }
317                    Err(r) => {
318                        // CAS failed. Recover `n` into a Box to reuse or drop it.
319                        //
320                        // SAFETY: `n` was produced by `Box::into_raw` moments ago.
321                        // Because the CAS failed, no other thread has ever observed
322                        // this pointer, so reconstituting ownership is safe.
323                        b = Some(unsafe { Box::from_raw(n) });
324
325                        // spurious failure
326                        if r.is_null() {
327                            continue '_0;
328                        }
329
330                        p = r;
331                    }
332                }
333            }
334
335            // Non-binding hint: peek at high(p) before paying for an atomic RMW. If
336            // the block already appears full we skip the fetch_add entirely. This load
337            // may be stale; the authoritative check is the `a_ >= L` guard below.
338            //
339            // SAFETY: `p` is a valid non-null pointer to a live `B<T>`. It was either
340            // just allocated (null-branch above) or loaded from phead with Acquire,
341            // which synchronizes-with the Release that published it. Relaxed ordering
342            // is sufficient because this is a non-binding advisory check only.
343            let a = unsafe { high((*p).p.load(Ordering::Relaxed)) };
344
345            if a >= L {
346                backoff.snooze();
347                continue '_0;
348            }
349
350            // Atomically claim a slot by incrementing high(B::p). The old value
351            // returned is our exclusive slot index. Relaxed ordering is sufficient:
352            // correctness relies on the Release commit (the low increment) that
353            // follows, not on the claim itself.
354            //
355            // SAFETY: `p` is valid as established above.
356            let a_ = unsafe { high((*p).p.fetch_add(merge(1, 0), Ordering::Relaxed)) };
357
358            // The hint may have been stale, or another producer claimed the last slot
359            // between the hint and our fetch_add. In either case we over-claimed.
360            // By [C3], only the producer with a_ == L-1 may run the block transition;
361            // all others that over-claim simply backoff and retry.
362            if a_ >= L {
363                backoff.snooze();
364                continue '_0;
365            }
366
367            // [C3]: We are the last claimer (a_ == L-1). Run the block transition
368            // BEFORE writing to slot a_, so that new producers can begin filling the
369            // next block without waiting for our write to commit. The Release stores
370            // below establish the happens-before edge described in [C4].
371            if a_ + 1 == L {
372                // Load p.n to determine whether a suitable successor block already
373                // exists. Acquire pairs with the Release that stored p.n in a prior
374                // block transition (or the initial null pointer).
375                //
376                // SAFETY: `p` is valid as above.
377                let n = unsafe { (*p).n.load(Ordering::Acquire) };
378
379                if n.is_null()
380                    || unsafe {
381                        // Check whether the existing successor block `n` has been fully
382                        // consumed (low(n.c) >= L means all L consumer commits have
383                        // occurred) and may therefore be recycled. Acquire pairs with the
384                        // Release consumer commits (fetch_add on B::c) in `pop`.
385                        //
386                        // SAFETY: `n` is non-null — the `||` short-circuits when n is
387                        // null, so we only reach this sub-expression when n != null. As a
388                        // block in the ring loaded from p.n with Acquire, `n` is a valid
389                        // live allocation for the lifetime of `Q`.
390                        low((*n).c.load(Ordering::Acquire)) < L
391                    }
392                {
393                    // `n` is either absent or not yet fully consumed. Allocate a new
394                    // block (or reuse the spare). Zero-initialization is valid for the
395                    // same reasons as the first-block allocation above.
396                    //
397                    // SAFETY: same as the first-block allocation above.
398                    let mut b = b
399                        .take()
400                        .unwrap_or_else(|| unsafe { Box::new_zeroed().assume_init() });
401
402                    // Link the new block into the ring:
403                    //   · If no successor exists (n is null), b_.n = p, forming a
404                    //     two-block cycle.
405                    //   · If a successor exists but is not yet fully consumed,
406                    //     b_.n = n, inserting b_ before n.
407                    // b_.c = F::MAX marks it as open to producers (via phead) but
408                    // not yet to consumers (sentinel; invariant 1, [C5]).
409                    *b.n.get_mut() = if n.is_null() { p } else { n };
410                    *b.c.get_mut() = F::MAX;
411
412                    let b_ = Box::into_raw(b);
413
414                    // Advance phead to b_, then link p.n to b_. Both stores are
415                    // Release for [C4]: a consumer Acquire-loading B::p and observing
416                    // low == L synchronizes-with these stores, guaranteeing that
417                    // p.n == b_ and b_.c == F::MAX are visible.
418                    //
419                    // SAFETY: `b_` is a freshly allocated, exclusively owned block.
420                    // `self.p` and `p` are valid as established above. By [C3] we are
421                    // the sole thread modifying phead and p.n at this moment.
422                    unsafe {
423                        self.p.as_ref().store(b_, Ordering::Release);
424                        (*p).n.store(b_, Ordering::Release);
425                    }
426                } else {
427                    // `n` is fully consumed (low(n.c) >= L). Recycle it in place.
428                    //
429                    // The order of the three stores below is significant:
430                    //   1. n.p = 0 (Release): reset the producer counter so that new
431                    //      producers observing phead == n start claiming from slot 0.
432                    //      This must precede the phead advance so producers see a
433                    //      clean counter.
434                    //   2. phead = n (Release): new producers may now begin pushing
435                    //      to n.
436                    //   3. n.c = F::MAX (Release): signal to the consumer (the one
437                    //      that will claim slot L-1 of the current block p and then
438                    //      advance chead) that n is ready for the consumer reset. This
439                    //      is stored AFTER the phead advance so that n.p is already
440                    //      zeroed before any consumer opens n for reading.
441                    //
442                    // All three stores are Release for [C4]: a consumer Acquire-loading
443                    // B::p and observing low == L synchronizes-with these stores,
444                    // making n.p == 0 and n.c == F::MAX visible on arrival.
445                    //
446                    // SAFETY: `n` is a valid live block confirmed fully consumed.
447                    // `self.p` is valid as above. By [C3] we are the sole modifier of
448                    // n.p, phead, and n.c at this moment.
449                    unsafe {
450                        (*n).p.store(0, Ordering::Release);
451                        self.p.as_ref().store(n, Ordering::Release);
452
453                        // `n` ready for consumer reset
454                        (*n).c.store(F::MAX, Ordering::Release);
455                    }
456                }
457            }
458
459            // Write the element to our exclusively owned slot, then commit by
460            // incrementing low(B::p) with Release.
461            //
462            // SAFETY:
463            //   · `p` is valid as established above.
464            //   · `a_` is in [0, L): the `a_ >= L` guard above enforces this.
465            //   · By [C6], slot a_ is exclusively owned by this producer — the
466            //     fetch_add assigned us a unique index that no other thread will
467            //     write to or read from before our commit is visible.
468            //   · Writing through `UnsafeCell::get()` is sound given exclusive
469            //     access. `MaybeUninit::new(e)` fully initializes the slot.
470            //   · The Release ordering on the commit (fetch_add on B::p) ensures
471            //     the write to a[a_] happens-before any consumer that Acquire-loads
472            //     B::p and observes our commit, satisfying [C2] and [C4].
473            unsafe {
474                (*p).a
475                    .get_unchecked(a_ as usize)
476                    .get()
477                    .write(MaybeUninit::new(e));
478                (*p).p.fetch_add(merge(0, 1), Ordering::Release);
479            }
480
481            return;
482        }
483    }
484
485    /// Removes and returns the front element, or [`None`] if no committed element
486    /// is currently available.
487    ///
488    /// Returns [`None`] when the queue is empty **or** when the current block has
489    /// no further committed slots at this instant (in-flight producers may still
490    /// be writing). After all producers have finished, `None` reliably indicates
491    /// an empty queue.
492    ///
493    /// This operation is lock-free and never parks the calling thread.
494    #[doc(alias = "dequeue")]
495    #[doc(alias = "recv")]
496    pub fn pop(&self) -> Option<T> {
497        let backoff = Backoff::new();
498
499        '_0: loop {
500            // Load chead with Acquire, pairing with the Release stores that publish
501            // or advance chead (the initial store in `push` for the first block, or
502            // the chead-advance store in `pop` for subsequent blocks).
503            //
504            // SAFETY: `self.c` is a `NonNull<AtomicPtr<B<T>>>` backed by a heap
505            // allocation that lives for the duration of `Q`, for the same reasons as
506            // `self.p` in `push`.
507            let c = unsafe { self.c.as_ref().load(Ordering::Acquire).as_ref()? };
508
509            // Load B::c (the consumer counter) for the current block. Acquire pairs
510            // with the Release store that last modified B::c: either the chead-advance
511            // store that zeroed it (opening this block for consumers), or a consumer
512            // commit below.
513            let mut r = c.c.load(Ordering::Acquire);
514
515            '_1: loop {
516                // r_ = high(B::c) = number of consumer slots already claimed in this block.
517                let mut r_ = high(r);
518
519                if r_ >= L {
520                    // All L consumer slots have already been claimed by other threads.
521                    // The consumer that claimed r_ = L-1 will have advanced (or is about
522                    // to advance) chead to the next block. Backoff and reload chead.
523                    backoff.snooze();
524                    continue '_0;
525                }
526
527                // Load B::p to evaluate the stability predicate [C1]. Acquire ordering
528                // establishes happens-before with all Release commits made by producers
529                // in `push`, satisfying [C2] once v(p) holds.
530                let mut p = c.p.load(Ordering::Acquire);
531
532                // [C1] STABILITY PREDICATE: v(p) := high(p) == low(p) || low(p) == L.
533                // Until v(p) holds there are in-flight producers: slots in the range
534                // [low(p), high(p)) are reserved but not yet written. We must not claim
535                // any slot until the block reaches a stable state. `backoff.snooze()`
536                // yields the thread to give those producers time to commit.
537                let v = |p: F| high(p) == low(p) || low(p) == L;
538
539                if !v(p) {
540                    // Spin until stable. Each Acquire reload re-establishes the
541                    // happens-before relationship required for [C2].
542                    '_2: loop {
543                        p = c.p.load(Ordering::Acquire);
544
545                        if v(p) {
546                            break;
547                        }
548
549                        backoff.snooze();
550                    }
551                }
552
553                // v(p) now holds. By [C2], every slot in [0, low(p)) contains a fully
554                // initialized value. If r_ >= low(p), all committed slots have already
555                // been claimed by other consumers — there is nothing left for us in this
556                // block. Return None to signal that this block is drained.
557                if r_ >= low(p) {
558                    return None;
559                }
560
561                // Claim a consumer slot. Two paths depending on whether the block is
562                // full or partial:
563                if low(p) == L {
564                    // Full block: unconditional fetch_add. We verified r_ < low(p) = L
565                    // above, so at least one slot was available when we checked. Relaxed
566                    // ordering is sufficient because the necessary happens-before was
567                    // already established by the Acquire load of B::p above ([C2]).
568                    let r = high(c.c.fetch_add(merge(1, 0), Ordering::Relaxed));
569
570                    if r >= L {
571                        // Another consumer raced ahead and filled all L consumer slots
572                        // between our r_ < low(p) check and the fetch_add. Backoff and
573                        // reload chead; the chead advance from the winner will have
574                        // occurred (or is imminent).
575                        backoff.snooze();
576                        continue '_0;
577                    }
578
579                    r_ = r;
580                } else {
581                    // Partial block: CAS to prevent over-claiming past low(p). We
582                    // increment high(B::c) only if it matches the value we loaded in `r`.
583                    // On failure, reload `r` with the fresh value and retry the inner
584                    // loop. Relaxed ordering is sufficient (same reasoning as the
585                    // full-block path).
586                    match c.c.compare_exchange_weak(
587                        r,
588                        r + merge(1, 0),
589                        Ordering::Relaxed,
590                        Ordering::Relaxed,
591                    ) {
592                        Ok(_) => {}
593                        Err(rc) => {
594                            backoff.spin();
595
596                            r = rc;
597                            continue '_1;
598                        }
599                    }
600                }
601
602                // We have exclusively claimed slot r_. Check whether we are the last
603                // consumer claimer for this block (r_ == L-1), which makes us
604                // responsible for advancing chead.
605                //
606                // [C5]: r_ < low(p) (passed the None-guard) and r_ + 1 == L together
607                // imply low(p) == L (since low(p) ≤ L is invariant). By [C4], the
608                // Acquire load of B::p that observed low(p) == L already synchronizes-
609                // with the last-claiming producer's Release commit, making the
610                // block-transition stores visible: c.n is non-null and
611                // c.n.c == F::MAX are both guaranteed on arrival. No spin is needed.
612                if r_ + 1 == L {
613                    // Load c.n. Acquire pairs with the Release stores in `push` that
614                    // wrote p.n (both the allocation and recycle paths). By [C5], the
615                    // returned pointer is guaranteed non-null.
616                    let n = c.n.load(Ordering::Acquire);
617
618                    // Open the next block for consumers by resetting its consumer
619                    // counter to 0, then advance chead.
620                    //
621                    // SAFETY:
622                    //   · `n` is non-null by [C5].
623                    //   · `n` is a valid live block in the ring (loaded from c.n with
624                    //     Acquire; all ring blocks are live for the lifetime of `Q`).
625                    //   · We are the exclusive opener: we are the unique claimer of
626                    //     consumer slot r_ = L-1 (uniqueness enforced by the atomic
627                    //     fetch_add / CAS above), and n.c == F::MAX (by [C5]) confirms
628                    //     no other consumer has already opened this block.
629                    //   · Relaxed on the n.c store is sufficient because the Release
630                    //     store of chead immediately after establishes visibility: any
631                    //     consumer that Acquire-loads chead and reaches block n will
632                    //     observe n.c == 0.
633                    unsafe {
634                        (*n).c.store(0, Ordering::Relaxed);
635                        self.c.as_ref().store(n, Ordering::Release);
636                    }
637                }
638
639                // Read the element from our exclusively owned slot, then commit by
640                // incrementing low(B::c) with Release. The Release on the commit makes
641                // our read of this slot visible to producers checking low(B::c) for
642                // the recycle condition in `push`.
643                //
644                // SAFETY:
645                //   · `r_` is in [0, L): enforced by the r_ >= L guard above and by
646                //     the fetch_add / CAS which bound the claimed index to < L.
647                //   · By [C6], slot r_ is exclusively owned by this consumer — the
648                //     fetch_add / CAS assigned us a unique index that no other thread
649                //     may read from concurrently.
650                //   · By [C2], slot r_ contains a fully initialized T: the Acquire
651                //     load of B::p established happens-before with the producer's
652                //     Release commit for that slot, which was itself sequenced-after
653                //     the write to a[r_]. `assume_init()` is therefore sound.
654                let e = unsafe { (*c).a.get_unchecked(r_ as usize).get().read().assume_init() };
655                c.c.fetch_add(merge(0, 1), Ordering::Release);
656
657                return Some(e);
658            }
659        }
660    }
661}
662
663impl<T> Clone for UBQ<T> {
664    fn clone(&self) -> Self {
665        // Increment the shared reference count before constructing the new handle.
666        // Relaxed ordering is sufficient: the increment transfers no data —
667        // producers and consumers synchronize memory through B::p and B::c in
668        // `push`/`pop`. We only need to atomically prevent the count from
669        // underflowing while this clone is alive.
670        //
671        // SAFETY: `self.n` is a `NonNull<CachePadded<AtomicUsize>>` backed by
672        // the heap allocation created in `new()`. Because `self` is a live clone
673        // the reference count is ≥ 1, so the allocation has not been freed.
674        // `as_ref()` borrows it for the lifetime of `self`, which is valid.
675        unsafe { self.n.as_ref().fetch_add(1, Ordering::Relaxed) };
676
677        // `NonNull<_>` is `Copy`, so each field clone is a bitwise copy of the
678        // raw pointer. All clones share the same phead, chead, and refcount
679        // allocations; no data is deep-copied.
680        Self {
681            p: self.p.clone(),
682            c: self.c.clone(),
683            n: self.n.clone(),
684        }
685    }
686}
687
688impl<T> Drop for UBQ<T> {
689    fn drop(&mut self) {
690        // Atomically decrement the reference count. Relaxed ordering is
691        // sufficient: the caller is responsible for ensuring all push/pop
692        // operations are complete before the last clone is destroyed (typically
693        // by joining producer/consumer threads). Under that contract, ownership
694        // of any remaining elements is transferred via thread-join
695        // synchronization rather than through this atomic.
696        //
697        // SAFETY: `self.n` is a valid `NonNull<CachePadded<AtomicUsize>>` for
698        // the same reasons as in `Clone::clone`.
699        let n = unsafe { self.n.as_ref().fetch_sub(1, Ordering::Relaxed) };
700
701        if n == 1 {
702            // We were the last clone (count dropped from 1 to 0). No other
703            // thread holds a `UBQ` handle, so we have exclusive access to all
704            // three shared allocations and to every block in the ring.
705            //
706            // Capture chead before freeing self.c's allocation. `get_mut()`
707            // bypasses atomic operations; sound here because exclusive access
708            // is guaranteed (no other clone exists, no concurrent push/pop).
709            //
710            // SAFETY: `self.c` is a valid `NonNull<CachePadded<AtomicPtr<B<T>>>>`.
711            // `as_mut()` requires exclusive access, which holds because `n == 1`
712            // before the fetch_sub guarantees we are the sole owner.
713            let mut b = unsafe { *self.c.as_mut().get_mut() };
714
715            // Reclaim the three meta-allocations (refcount, phead, chead boxes).
716            // Each pointer was produced by `Box::into_raw` in `new()` and has
717            // not been freed — the previous reference count of 1 ensures this
718            // is the first and only reclamation.
719            //
720            // SAFETY: All three raw pointers are valid, non-null, and
721            // exclusively owned at this point.
722            unsafe {
723                drop(Box::from_raw(self.n.as_ptr()));
724                drop(Box::from_raw(self.p.as_ptr()));
725                drop(Box::from_raw(self.c.as_ptr()));
726            }
727
728            // If no block was ever allocated (queue was created but never
729            // pushed to), chead is null and there is nothing to traverse.
730            if b.is_null() {
731                return;
732            }
733
734            // Traverse the circular block ring starting from chead and drop
735            // every unconsumed element. We stop when we return to the starting
736            // block (b == b_) or hit a null next pointer (ring partially
737            // formed on early teardown).
738            let b_ = b;
739
740            loop {
741                // Read the next pointer and both counters directly, bypassing
742                // atomic operations — valid because we have exclusive access.
743                //
744                // SAFETY: `b` is non-null (checked above; each subsequent
745                // value is a valid ring block's `n` pointer) and points to a
746                // live `B<T>` allocation.
747                let n = unsafe { *(*b).n.get_mut() };
748
749                let p = unsafe { *(*b).p.get_mut() };
750                let c = unsafe { *(*b).c.get_mut() };
751
752                debug_assert!(
753                    high(p) == low(p) || low(p) == L,
754                    "all producers should be finished before dropping UBQ (p = {}:{})",
755                    high(p),
756                    low(p)
757                );
758                debug_assert!(
759                    high(c) == low(c) || low(c) == L,
760                    "all consumers should be finished before dropping UBQ (c = {}:{})",
761                    high(c),
762                    low(c),
763                );
764
765                // Drop unconsumed elements in this block. The live range of
766                // initialized, unconsumed slots is:
767                //   · [0, low(p))        when c == F::MAX — block was never
768                //     opened to consumers; every produced value is unconsumed.
769                //   · [low(c), low(p))   otherwise — low(c) committed consumer
770                //     reads have already moved [0, low(c)) out; the remaining
771                //     range [low(c), low(p)) holds initialized unconsumed Ts.
772                //
773                // By [C2], every slot in [0, low(p)) contains a fully
774                // initialized T. The debug_asserts above confirm no in-flight
775                // producers remain, so low(p) accurately reflects all slots
776                // written in this block.
777                for i in if c == F::MAX { 0 } else { low(c) }..low(p) {
778                    // SAFETY:
779                    //   · `b` is a valid `*mut B<T>` as established above.
780                    //   · `i` is in [0, low(p)) ⊆ [0, L); the offset is in
781                    //     bounds of the `a` array.
782                    //   · Slot `i` holds an initialized T (by [C2] and the
783                    //     range argument above).
784                    //   · Exclusive access is guaranteed (last-clone
785                    //     exclusivity); no concurrent reads or writes occur.
786                    //   · `cast::<T>()` is valid: `UnsafeCell<MaybeUninit<T>>`
787                    //     is `repr(transparent)` over `MaybeUninit<T>`, which
788                    //     is `repr(transparent)` over `T`; layout matches.
789                    unsafe {
790                        (*b).a
791                            .as_mut_ptr_range()
792                            .start
793                            .add(i as usize)
794                            .cast::<T>()
795                            .drop_in_place()
796                    }
797                }
798
799                b = n;
800
801                if b.is_null() || b == b_ {
802                    break;
803                }
804            }
805        }
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use std::{
812        fmt::{Debug, Write},
813        sync::{
814            Arc,
815            atomic::{AtomicBool, AtomicUsize, Ordering},
816        },
817        thread::{self, JoinHandle},
818        usize,
819    };
820
821    use super::*;
822
823    impl<T> Debug for UBQ<T> {
824        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825            let p = unsafe { *self.p.as_ref().as_ptr() };
826
827            if p.is_null() {
828                return writeln!(f, "UBQ {{}}");
829            }
830
831            let mut s = String::new();
832            let mut c = p;
833
834            let fmt = |u: F| -> String {
835                if u == F::MAX {
836                    format!("full:full")
837                } else {
838                    format!("{:04}:{:04}", high(u), low(u))
839                }
840            };
841
842            loop {
843                let p_ = unsafe { *(*c).p.as_ptr() };
844                let c_ = unsafe { *(*c).c.as_ptr() };
845
846                write!(s, "\t{c:p}: p={}, c={}", fmt(p_), fmt(c_))?;
847
848                c = unsafe { *(*c).n.as_ptr() };
849                if c == p {
850                    break;
851                }
852
853                write!(s, "\n")?;
854            }
855
856            write!(f, "UBQ {{\n{s}\t}}")
857        }
858    }
859
860    struct DropProbe {
861        dropped: Arc<AtomicUsize>,
862    }
863
864    impl DropProbe {
865        fn new(dropped: Arc<AtomicUsize>) -> Self {
866            Self { dropped }
867        }
868    }
869
870    impl Drop for DropProbe {
871        fn drop(&mut self) {
872            self.dropped.fetch_add(1, Ordering::SeqCst);
873        }
874    }
875
876    #[test]
877    fn drop_releases_all_enqueued_values() {
878        let token = Arc::new(());
879        let n = (L as usize * 3) + 7;
880
881        for _ in 0..16 {
882            let q = UBQ::new();
883
884            for _ in 0..n {
885                q.push(token.clone());
886            }
887
888            assert_eq!(Arc::strong_count(&token), n + 1);
889
890            println!("q: {q:?}");
891
892            drop(q);
893
894            assert_eq!(Arc::strong_count(&token), 1);
895        }
896    }
897
898    #[test]
899    fn drop_of_final_clone_drops_items_left_in_queue() {
900        let dropped = Arc::new(AtomicUsize::new(0));
901
902        let q = UBQ::new();
903        let q1 = q.clone();
904        let q2 = q.clone();
905
906        let total = (L as usize * 2) + 5;
907        let popped = (L as usize / 2) + 1;
908
909        for _ in 0..(L as usize + 1) {
910            q.push(DropProbe::new(dropped.clone()));
911        }
912        for _ in 0..(total - (L as usize + 1)) {
913            q1.push(DropProbe::new(dropped.clone()));
914        }
915
916        let mut held = Vec::with_capacity(popped);
917        for _ in 0..popped {
918            held.push(
919                q2.pop()
920                    .expect("queue should contain enough elements for this test"),
921            );
922        }
923
924        drop(q);
925        drop(q1);
926
927        assert_eq!(dropped.load(Ordering::SeqCst), 0);
928
929        let remaining = total - popped;
930        drop(q2);
931
932        assert_eq!(dropped.load(Ordering::SeqCst), remaining);
933
934        drop(held);
935
936        assert_eq!(dropped.load(Ordering::SeqCst), total);
937    }
938
939    #[test]
940    fn fill_drain_ordered() {
941        let q = UBQ::new();
942
943        let m = 1_000_000;
944        for i in 0..m {
945            q.push(i);
946        }
947
948        for i in 0..m {
949            assert_eq!(q.pop(), Some(i));
950        }
951    }
952
953    #[test]
954    fn mpmc_4p4c() {
955        let q = UBQ::new();
956
957        let flag = Arc::new(AtomicBool::new(true));
958
959        let pf = |q: UBQ<usize>, m: usize| -> JoinHandle<()> {
960            thread::spawn(move || {
961                for i in 0..m {
962                    q.push(i);
963                }
964            })
965        };
966
967        let cf = |q: UBQ<usize>, m: usize| -> JoinHandle<()> {
968            let flag = flag.clone();
969
970            thread::spawn(move || {
971                for _ in 0..m {
972                    loop {
973                        if flag.load(Ordering::Acquire) {
974                            if q.pop().is_some() {
975                                break;
976                            }
977                        } else {
978                            assert!(q.pop().is_some());
979                            break;
980                        }
981                    }
982                }
983            })
984        };
985
986        let m = 1_000_001;
987        let v: Vec<_> = (0..4)
988            .map(|_| (pf(q.clone(), m), cf(q.clone(), m)))
989            .collect();
990
991        let v: Vec<_> = v
992            .into_iter()
993            .map(|(p, c)| {
994                p.join().unwrap();
995                c
996            })
997            .collect();
998
999        flag.store(false, Ordering::Release);
1000
1001        for c in v {
1002            c.join().unwrap()
1003        }
1004    }
1005}