Skip to main content

oxideav_core/arena/
sync.rs

1//! `Send + Sync` mirror of the parent [`crate::arena`] module.
2//!
3//! This module exposes the same four-type API ([`ArenaPool`],
4//! [`Arena`], [`Frame`], [`FrameInner`]) as its sibling, with one
5//! difference that ripples through the whole shape:
6//!
7//! - [`Arena`] uses `AtomicUsize` / `AtomicU32` for its bump cursor
8//!   and allocation counter (instead of `Cell<usize>` / `Cell<u32>`),
9//!   and is therefore `Send + Sync`.
10//! - [`Frame`] is `Arc<FrameInner>` (instead of `Rc<FrameInner>`),
11//!   so a decoded frame can be moved or shared across threads.
12//! - [`FrameInner`] holds a sync [`Arena`], so it is itself `Send +
13//!   Sync` and `Arc<FrameInner>: Send + Sync` falls out for free.
14//!
15//! ## When to use which
16//!
17//! Use [`crate::arena`] (the `Rc` variant) when the decoder produces
18//! frames on the same thread that consumes them. The bump cursor is
19//! a plain `Cell<usize>` and there are no atomic operations on the
20//! hot allocation path.
21//!
22//! Use this module (the `Arc` variant) when the decoder hands frames
23//! to a different thread — the typical case for a pipeline that
24//! decodes on one worker and renders / encodes / transmits on
25//! another. The cost is a relaxed atomic load + CAS per allocation
26//! and an atomic refcount per frame clone; both are negligible
27//! compared to the actual decode work.
28//!
29//! ## Concurrent allocation contract
30//!
31//! [`Arena::alloc`] uses a CAS loop on the cursor, so two threads
32//! that both call [`Arena::alloc`] on the same `&Arena` will receive
33//! disjoint slices (the loser of the CAS retries against the new
34//! cursor). The returned `&mut [T]` points into a region that no
35//! other in-flight `alloc()` call can also receive, and the slice's
36//! lifetime is bounded by the borrow of `&self`.
37//!
38//! In practice the typical pattern is **one decoder thread allocates,
39//! then freezes into a [`Frame`] which is shared read-only across
40//! threads** — concurrent allocation is supported but rarely useful.
41//! The bytes returned by [`Arena::alloc`] are not zero-initialised;
42//! callers must fully overwrite them before reading.
43//!
44//! Everything else (per-arena byte cap, per-arena allocation-count
45//! cap, weak handle back to the pool for `Drop`-time release, the
46//! `FrameHeader` shape, plane validation in [`FrameInner::new`])
47//! matches the parent module exactly.
48
49use std::mem::{align_of, size_of};
50use std::ptr::NonNull;
51use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
52use std::sync::{Arc, Mutex, Weak};
53
54use crate::error::{Error, Result};
55
56// Re-export the shared `FrameHeader` and `MAX_PLANES` constant so
57// users of either arena module see the same metadata shape — there is
58// no thread-safety angle to either of them, and duplicating them
59// would only add drift.
60pub use super::{FrameHeader, MAX_PLANES};
61// `Buffer` and `MAX_ALIGN` are shared with the parent module — the
62// pool-backing storage and the soundness-critical alignment constant
63// are identical for the `Rc` and `Arc` variants. See `arena/mod.rs`
64// for the full soundness rationale.
65use super::{Buffer, MAX_ALIGN};
66
67/// `Send + Sync` pool of reusable byte buffers for arena-backed frame
68/// allocations. Mirrors [`crate::arena::ArenaPool`] in shape and
69/// behaviour; the only difference is that the [`Arena`] (and the
70/// [`Frame`] holding it) handed out are themselves `Send + Sync`.
71///
72/// Construct via [`ArenaPool::new`]. Lease an [`Arena`] per frame via
73/// [`ArenaPool::lease`]; drop the arena (or drop the last clone of a
74/// [`Frame`] holding it) to return its buffer to the pool.
75pub struct ArenaPool {
76    inner: Mutex<PoolInner>,
77    cap_per_arena: usize,
78    max_arenas: usize,
79    max_alloc_count_per_arena: u32,
80}
81
82struct PoolInner {
83    /// Buffers currently sitting idle in the pool (ready to lease).
84    idle: Vec<Buffer>,
85    /// Total buffers ever allocated by this pool (idle + in-flight).
86    /// Caps lazy growth at `max_arenas`.
87    total_allocated: usize,
88}
89
90impl ArenaPool {
91    /// Construct a new pool with `max_arenas` buffer slots, each of
92    /// `cap_per_arena` bytes. Buffers are allocated lazily on first
93    /// lease — a freshly constructed pool holds no memory.
94    ///
95    /// Per-arena allocation count is capped at a generous 1 M
96    /// (override via [`ArenaPool::with_alloc_count_cap`]).
97    pub fn new(max_arenas: usize, cap_per_arena: usize) -> Arc<Self> {
98        Self::with_alloc_count_cap(max_arenas, cap_per_arena, 1_000_000)
99    }
100
101    /// Like [`ArenaPool::new`] but lets the caller set the per-arena
102    /// allocation-count cap. Useful when the caller is plumbing
103    /// [`crate::DecoderLimits`] through.
104    pub fn with_alloc_count_cap(
105        max_arenas: usize,
106        cap_per_arena: usize,
107        max_alloc_count_per_arena: u32,
108    ) -> Arc<Self> {
109        Arc::new(Self {
110            inner: Mutex::new(PoolInner {
111                idle: Vec::with_capacity(max_arenas),
112                total_allocated: 0,
113            }),
114            cap_per_arena,
115            max_arenas,
116            max_alloc_count_per_arena,
117        })
118    }
119
120    /// Capacity of each arena buffer this pool hands out, in bytes.
121    pub fn cap_per_arena(&self) -> usize {
122        self.cap_per_arena
123    }
124
125    /// Maximum number of arenas that may be checked out at once.
126    pub fn max_arenas(&self) -> usize {
127        self.max_arenas
128    }
129
130    /// Lease one arena from the pool. Returns
131    /// [`Error::ResourceExhausted`] if every arena slot is already
132    /// checked out by an [`Arena`] (or a [`Frame`] holding one).
133    pub fn lease(self: &Arc<Self>) -> Result<Arena> {
134        let buffer = {
135            let mut inner = self.inner.lock().expect("ArenaPool mutex poisoned");
136            if let Some(buf) = inner.idle.pop() {
137                buf
138            } else if inner.total_allocated < self.max_arenas {
139                inner.total_allocated += 1;
140                Buffer::new_zeroed(self.cap_per_arena)
141            } else {
142                return Err(Error::resource_exhausted(format!(
143                    "ArenaPool exhausted: all {} arenas checked out",
144                    self.max_arenas
145                )));
146            }
147        };
148
149        let base = buffer.ptr;
150        Ok(Arena {
151            buffer: Mutex::new(Some(buffer)),
152            base,
153            cursor: AtomicUsize::new(0),
154            alloc_count: AtomicU32::new(0),
155            cap: self.cap_per_arena,
156            alloc_count_cap: self.max_alloc_count_per_arena,
157            pool: Arc::downgrade(self),
158        })
159    }
160
161    /// Return a buffer to the idle list. Called from `Arena::Drop`;
162    /// not part of the public API. The buffer is zeroed before being
163    /// returned so the next lease starts from a clean state — this is
164    /// what makes `Zeroable` a sufficient bound on `Arena::alloc<T>`
165    /// across pool reuse cycles.
166    fn release(&self, mut buffer: Buffer) {
167        buffer.zero();
168        if let Ok(mut inner) = self.inner.lock() {
169            inner.idle.push(buffer);
170        }
171        // If the lock is poisoned, drop the buffer normally — the
172        // pool is in an unusable state already.
173    }
174}
175
176/// One leased buffer from a [`ArenaPool`]. `Send + Sync`.
177///
178/// Allocations are bump-pointer on an atomic cursor: each call to
179/// [`Arena::alloc`] CAS-advances the cursor and returns a fresh
180/// aligned slice carved out of the buffer at the old position. There
181/// is no per-allocation header and no individual free — the entire
182/// arena is reset (returned to the pool) only when the `Arena` is
183/// dropped.
184///
185/// Concurrent calls to [`Arena::alloc`] on the same `&Arena` are
186/// supported and produce disjoint slices (the CAS loser retries
187/// against the new cursor). See the module docs for the full
188/// concurrency contract.
189pub struct Arena {
190    /// Backing buffer leased from the pool. Stored in a `Mutex` so
191    /// `Drop` can `take()` the buffer without needing direct
192    /// `UnsafeCell` access (which would re-borrow the whole storage
193    /// and invalidate previously-returned slices under stacked
194    /// borrows). Outside of `Drop` this is always `Some` — the
195    /// mutex itself is essentially uncontended (nothing else touches
196    /// it on the hot path).
197    ///
198    /// We never re-borrow this buffer mutably while handing out
199    /// slices from it — the typed pointers returned by `alloc` are
200    /// derived from the cached raw `base` pointer below, never from
201    /// a fresh borrow of the whole storage. This is what avoids the
202    /// stacked-borrows whole-buffer-retag race that Miri reported
203    /// when two threads called `alloc` concurrently while a third
204    /// held a previously-returned `&mut [T]`.
205    buffer: Mutex<Option<Buffer>>,
206    /// Cached base pointer of `buffer` (a [`MAX_ALIGN`]-aligned
207    /// allocation owned by `buffer`). Stable for the lifetime of the
208    /// arena: `Buffer` does not move its allocation, and we only take
209    /// `buffer` out of the mutex during `Drop` after no allocator
210    /// activity remains. All `alloc` calls derive their typed
211    /// pointers from `base.as_ptr().add(offset)`.
212    base: NonNull<u8>,
213    /// Atomic bump cursor: the next free byte offset within the
214    /// buffer.
215    cursor: AtomicUsize,
216    /// Atomic allocation counter.
217    alloc_count: AtomicU32,
218    /// Cached cap (== `pool.cap_per_arena` at lease time).
219    cap: usize,
220    /// Cached cap (== `pool.max_alloc_count_per_arena` at lease time).
221    alloc_count_cap: u32,
222    /// Weak handle back to the pool so `Drop` can return the buffer.
223    pool: Weak<ArenaPool>,
224}
225
226// SAFETY: `Arena` owns its buffer's allocation outright (no shared
227// ownership), all cursor/count mutations go through atomics, and the
228// raw `base` pointer is only used to derive disjoint typed slices
229// whose ranges the CAS loop guarantees not to overlap. The `Drop`
230// path takes the buffer out of the mutex under `&mut self` — no
231// other thread can be in `alloc` at that point.
232unsafe impl Send for Arena {}
233// SAFETY: `&Arena::alloc` mutates only via the atomic cursor and the
234// allocation counter (themselves `Sync`) and writes into a region of
235// the buffer that no other in-flight call has been handed (CAS
236// guarantees disjoint regions). The raw `base` pointer is never used
237// to materialise a whole-buffer mutable borrow, so a new `alloc`
238// call cannot invalidate any other thread's previously returned
239// `&mut [T]` slice under stacked borrows.
240unsafe impl Sync for Arena {}
241
242impl Arena {
243    /// Capacity of this arena in bytes.
244    pub fn capacity(&self) -> usize {
245        self.cap
246    }
247
248    /// Bytes consumed by allocations so far.
249    pub fn used(&self) -> usize {
250        self.cursor.load(Ordering::Acquire)
251    }
252
253    /// Number of allocations performed so far.
254    pub fn alloc_count(&self) -> u32 {
255        self.alloc_count.load(Ordering::Acquire)
256    }
257
258    /// `true` once the per-arena allocation-count cap has been
259    /// reached. Decoders that produce many small allocations should
260    /// poll this and bail with [`Error::ResourceExhausted`] when it
261    /// flips, instead of waiting for the next [`Arena::alloc`] call
262    /// to fail.
263    pub fn alloc_count_exceeded(&self) -> bool {
264        self.alloc_count.load(Ordering::Acquire) >= self.alloc_count_cap
265    }
266
267    /// Allocate `count` `T`s out of this arena. Returns a borrowed
268    /// `&mut [T]` (lifetime bounded by the borrow of `self`).
269    ///
270    /// The returned slice points at zero-filled bytes (the pool
271    /// zero-fills on initial allocation and again whenever a buffer
272    /// is returned). The `Zeroable` bound on `T` guarantees that an
273    /// all-zero bit pattern is a valid value for `T`, so reading the
274    /// slice without first writing it is sound. The intended pattern
275    /// is still "decoder fills the slice, then reads back what it
276    /// wrote" — but unwritten bytes will read back as `T::zeroed()`
277    /// rather than as UB.
278    ///
279    /// Returns [`Error::ResourceExhausted`] if either the per-arena
280    /// byte cap or the per-arena allocation-count cap would be
281    /// exceeded.
282    ///
283    /// # Type bounds
284    ///
285    /// - `T: bytemuck::Zeroable` — pool buffers are zero-filled, so
286    ///   handing back `&mut [T]` over those bytes is only sound when
287    ///   the all-zero bit pattern is valid for `T`. This rules out
288    ///   `NonZeroU8`/`NonZeroU16`/…/references/function pointers/
289    ///   niche-optimised enums.
290    /// - `align_of::<T>() <= MAX_ALIGN` — checked at compile time via
291    ///   a `const` assertion. The pool buffer's base pointer is
292    ///   aligned to [`MAX_ALIGN`] (= 64 bytes); per-`T` alignment is
293    ///   then a relative-offset adjustment of the bump cursor.
294    /// - The arena does not run destructors on allocated values, so
295    ///   `T` should not have meaningful `Drop` glue.
296    ///
297    /// **Concurrency:** the bump cursor is advanced via a CAS loop,
298    /// so concurrent `alloc` calls on the same `&Arena` produce
299    /// disjoint slices. The CAS loser retries against the new
300    /// cursor; in the uncontended case the cost is a single relaxed
301    /// load plus one successful CAS. Crucially, no `alloc` call
302    /// re-borrows the whole buffer (the typed pointer is derived
303    /// from the cached raw base pointer), so concurrent allocators
304    /// cannot invalidate each other's previously-returned slices
305    /// under stacked borrows.
306    #[allow(clippy::mut_from_ref)] // see "Concurrency" doc above.
307    pub fn alloc<T>(&self, count: usize) -> Result<&mut [T]>
308    where
309        T: bytemuck::Zeroable,
310    {
311        // Compile-time check: T's alignment must not exceed the
312        // pool buffer's base alignment. Doing this as a const-eval'd
313        // assert means a violating monomorphisation fails the build.
314        const fn assert_align<T>() {
315            assert!(
316                align_of::<T>() <= MAX_ALIGN,
317                "Arena::alloc<T>: align_of::<T>() exceeds MAX_ALIGN; \
318                 increase MAX_ALIGN in arena/mod.rs"
319            );
320        }
321        const { assert_align::<T>() };
322
323        // Allocation-count cap. Increment first; if we overshoot,
324        // roll back so subsequent calls still see the correct value.
325        let prev_count = self.alloc_count.fetch_add(1, Ordering::AcqRel);
326        if prev_count >= self.alloc_count_cap {
327            // Roll back so `alloc_count_exceeded()` keeps returning
328            // a stable cap value rather than drifting upward.
329            self.alloc_count.fetch_sub(1, Ordering::AcqRel);
330            return Err(Error::resource_exhausted(format!(
331                "Arena alloc-count cap of {} exceeded",
332                self.alloc_count_cap
333            )));
334        }
335
336        let elem_size = size_of::<T>();
337        let elem_align = align_of::<T>();
338        // Bytes requested.
339        let bytes = elem_size.checked_mul(count).ok_or_else(|| {
340            // Roll back the alloc-count bump on size-overflow too.
341            self.alloc_count.fetch_sub(1, Ordering::AcqRel);
342            Error::resource_exhausted("Arena alloc size overflow".to_string())
343        })?;
344
345        // CAS loop on the cursor. We compute aligned + new_cursor
346        // from the latest observed cursor value, then attempt to
347        // claim that range; if another thread won the race, retry
348        // against the updated cursor.
349        let mut current = self.cursor.load(Ordering::Acquire);
350        let aligned;
351        let new_cursor;
352        loop {
353            let candidate_aligned = match align_up(current, elem_align) {
354                Some(a) => a,
355                None => {
356                    self.alloc_count.fetch_sub(1, Ordering::AcqRel);
357                    return Err(Error::resource_exhausted(
358                        "Arena cursor alignment overflow".to_string(),
359                    ));
360                }
361            };
362            let candidate_new = match candidate_aligned.checked_add(bytes) {
363                Some(n) => n,
364                None => {
365                    self.alloc_count.fetch_sub(1, Ordering::AcqRel);
366                    return Err(Error::resource_exhausted(
367                        "Arena cursor advance overflow".to_string(),
368                    ));
369                }
370            };
371
372            if candidate_new > self.cap {
373                self.alloc_count.fetch_sub(1, Ordering::AcqRel);
374                return Err(Error::resource_exhausted(format!(
375                    "Arena cap of {} bytes exceeded (would consume {} bytes)",
376                    self.cap, candidate_new
377                )));
378            }
379
380            match self.cursor.compare_exchange_weak(
381                current,
382                candidate_new,
383                Ordering::AcqRel,
384                Ordering::Acquire,
385            ) {
386                Ok(_) => {
387                    aligned = candidate_aligned;
388                    new_cursor = candidate_new;
389                    let _ = new_cursor; // silence "unused" if optimised
390                    break;
391                }
392                Err(observed) => {
393                    current = observed;
394                    // Retry with the freshly observed cursor.
395                }
396            }
397        }
398
399        // SAFETY:
400        //
401        // - `self.base` points to a `MAX_ALIGN`-aligned allocation of
402        //   `self.cap` bytes owned by the `Buffer` inside
403        //   `self.buffer`, which lives at least as long as `&self`.
404        // - We just CAS-claimed the byte range `aligned..new_cursor`,
405        //   so no other in-flight `alloc` call can claim any byte
406        //   inside it (the cursor is monotonically non-decreasing
407        //   under successful CAS, so a subsequent winner observes a
408        //   `current` >= our `new_cursor`).
409        // - `aligned + count*size_of::<T>() <= self.cap` (just checked
410        //   above), so the byte range is in-bounds of the allocation.
411        // - `aligned` is a multiple of `align_of::<T>()` and `MAX_ALIGN
412        //   >= align_of::<T>()` (compile-time assert above), so `base
413        //   + aligned` is `T`-aligned (true even for `count == 0`).
414        // - We derive the typed pointer from `self.base.as_ptr()`, not
415        //   from a fresh borrow of the whole buffer, so this slice
416        //   does not invalidate any other thread's previously
417        //   returned `&mut [T]` under stacked borrows.
418        // - `T: Zeroable` and the buffer bytes are zero, so the
419        //   `&mut [T]` references valid `T` values.
420        let slice: &mut [T] = unsafe {
421            let elem_ptr = self.base.as_ptr().add(aligned).cast::<T>();
422            std::slice::from_raw_parts_mut(elem_ptr, count)
423        };
424
425        Ok(slice)
426    }
427
428    /// Reset the arena to empty without releasing its buffer to the
429    /// pool. Useful for a decoder that wants to reuse the same arena
430    /// across several intermediate stages of the same frame. Callers
431    /// must ensure no slice previously returned from [`Arena::alloc`]
432    /// is still in use — Rust's borrow checker enforces this, since
433    /// `reset` takes `&mut self`.
434    pub fn reset(&mut self) {
435        // `&mut self` proves exclusive access; non-atomic stores
436        // would suffice, but the atomic API is uniform.
437        self.cursor.store(0, Ordering::Release);
438        self.alloc_count.store(0, Ordering::Release);
439    }
440}
441
442impl Drop for Arena {
443    fn drop(&mut self) {
444        // We're in Drop with `&mut self`, so no `alloc`-returned
445        // slices can still be borrowing from `base` and no other
446        // thread can be in `alloc`. Take the buffer out of the mutex
447        // and either return it to the pool or let it free here.
448        let taken = self.buffer.get_mut().ok().and_then(|slot| slot.take());
449        if let Some(buffer) = taken {
450            if let Some(pool) = self.pool.upgrade() {
451                pool.release(buffer);
452            } else {
453                // Pool was dropped before us — buffer drops here and
454                // its allocation is freed via `Buffer::Drop`.
455                drop(buffer);
456            }
457        }
458    }
459}
460
461/// Round `n` up to the next multiple of `align`. `align` must be a
462/// power of two. Returns `None` on overflow.
463fn align_up(n: usize, align: usize) -> Option<usize> {
464    debug_assert!(align.is_power_of_two(), "alignment must be a power of two");
465    let mask = align - 1;
466    n.checked_add(mask).map(|m| m & !mask)
467}
468
469/// The owned body of a refcounted [`Frame`]. `Send + Sync`.
470///
471/// Holds a [`sync::Arena`](Arena) (the bytes), a fixed-size table of
472/// `(offset_in_arena, length_in_bytes)` pairs (one per plane), and a
473/// [`FrameHeader`]. The `plane_count` field tracks how many entries
474/// of `plane_offsets` are actually populated. Up to [`MAX_PLANES`]
475/// planes are supported.
476///
477/// **Lifetime:** an [`Arena`] returns its buffer to the pool when
478/// dropped. An `Arc<FrameInner>` keeps the arena alive via its single
479/// owned field, so as long as any clone of a [`Frame`] exists the
480/// underlying buffer stays out of the pool.
481pub struct FrameInner {
482    arena: Arena,
483    plane_offsets: [(usize, usize); MAX_PLANES],
484    plane_count: u8,
485    header: FrameHeader,
486}
487
488/// Refcounted handle to a decoded video frame. `Send + Sync`.
489///
490/// Construct via [`FrameInner::new`]; clone freely (each clone bumps
491/// the atomic refcount by 1). The arena and its buffer are released
492/// back to the pool when the last clone is dropped.
493///
494/// Use this type when the decoder hands frames to a different thread
495/// from the one that produced them. For same-thread decode/consume,
496/// the cheaper [`crate::arena::Frame`] (`Rc`-backed) is preferable.
497pub type Frame = Arc<FrameInner>;
498
499impl FrameInner {
500    /// Construct a `Frame` (`Arc<FrameInner>`) from an arena, a slice
501    /// of `(offset, length)` plane descriptors, and a header. Returns
502    /// [`Error::InvalidData`] if more than [`MAX_PLANES`] planes are
503    /// supplied or if any plane range falls outside the arena's used
504    /// region.
505    pub fn new(arena: Arena, planes: &[(usize, usize)], header: FrameHeader) -> Result<Frame> {
506        if planes.len() > MAX_PLANES {
507            return Err(Error::invalid(format!(
508                "FrameInner supports at most {} planes (got {})",
509                MAX_PLANES,
510                planes.len()
511            )));
512        }
513        let used = arena.used();
514        for (i, (off, len)) in planes.iter().enumerate() {
515            let end = off
516                .checked_add(*len)
517                .ok_or_else(|| Error::invalid(format!("plane {i}: offset+len overflow")))?;
518            if end > used {
519                return Err(Error::invalid(format!(
520                    "plane {i}: range {off}..{end} exceeds arena used={used}"
521                )));
522            }
523        }
524        let mut plane_offsets = [(0usize, 0usize); MAX_PLANES];
525        for (i, p) in planes.iter().enumerate() {
526            plane_offsets[i] = *p;
527        }
528        Ok(Arc::new(FrameInner {
529            arena,
530            plane_offsets,
531            plane_count: planes.len() as u8,
532            header,
533        }))
534    }
535
536    /// Number of planes this frame holds.
537    pub fn plane_count(&self) -> usize {
538        self.plane_count as usize
539    }
540
541    /// Read-only access to plane `i`. Returns `None` if `i` is out of
542    /// range.
543    pub fn plane(&self, i: usize) -> Option<&[u8]> {
544        if i >= self.plane_count as usize {
545            return None;
546        }
547        let (off, len) = self.plane_offsets[i];
548        // SAFETY:
549        // - plane ranges were validated against `arena.used()` at
550        //   construction (`off + len <= arena.cursor`), and the
551        //   cursor only advances, so the byte range is in-bounds.
552        // - The bytes were written by `alloc` and never moved (the
553        //   buffer's allocation is stable for the arena's lifetime).
554        // - We derive the slice from the raw base pointer, never via
555        //   a re-borrow of the whole buffer, so this `&[u8]` does not
556        //   invalidate any other slice the caller is holding.
557        // - The borrow lifetime is bounded by `&self`.
558        let buf: &[u8] = unsafe {
559            let elem_ptr = self.arena.base.as_ptr().add(off);
560            std::slice::from_raw_parts(elem_ptr, len)
561        };
562        Some(buf)
563    }
564
565    /// Frame header (width / height / pixel format / pts).
566    pub fn header(&self) -> &FrameHeader {
567        &self.header
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::format::PixelFormat;
575
576    fn assert_send_sync<T: Send + Sync>() {}
577
578    #[test]
579    fn types_are_send_sync() {
580        // The whole point of this module: prove the public types
581        // satisfy the cross-thread contract that `crate::arena` does
582        // not.
583        assert_send_sync::<ArenaPool>();
584        assert_send_sync::<Arc<ArenaPool>>();
585        assert_send_sync::<Arena>();
586        assert_send_sync::<FrameInner>();
587        assert_send_sync::<Frame>();
588    }
589
590    fn small_pool(slots: usize, cap: usize) -> Arc<ArenaPool> {
591        ArenaPool::new(slots, cap)
592    }
593
594    #[test]
595    fn pool_lease_returns_err_when_exhausted() {
596        let pool = small_pool(2, 1024);
597        let a = pool.lease().expect("first lease");
598        let b = pool.lease().expect("second lease");
599        let third = pool.lease();
600        assert!(matches!(third, Err(Error::ResourceExhausted(_))));
601        drop((a, b));
602    }
603
604    #[test]
605    fn arena_alloc_caps_at_size_limit() {
606        let pool = small_pool(1, 64);
607        let arena = pool.lease().unwrap();
608        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
609        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
610        let third = arena.alloc::<u8>(1);
611        assert!(matches!(third, Err(Error::ResourceExhausted(_))));
612    }
613
614    #[test]
615    fn arena_alloc_count_cap_fires() {
616        let pool = ArenaPool::with_alloc_count_cap(1, 1024, 3);
617        let arena = pool.lease().unwrap();
618        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
619        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
620        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
621        assert!(arena.alloc_count_exceeded());
622        let fourth = arena.alloc::<u8>(1);
623        assert!(matches!(fourth, Err(Error::ResourceExhausted(_))));
624        // Counter must remain at the cap even after a refused alloc
625        // — no drift from the rollback path.
626        assert_eq!(arena.alloc_count(), 3);
627    }
628
629    #[test]
630    fn arena_returns_to_pool_on_drop() {
631        let pool = small_pool(1, 256);
632        {
633            let arena = pool.lease().expect("first lease");
634            assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
635            drop(arena);
636        }
637        let _again = pool.lease().expect("re-lease after drop");
638    }
639
640    #[test]
641    fn arena_alignment_is_respected() {
642        let pool = small_pool(1, 64);
643        let arena = pool.lease().unwrap();
644        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
645        let s: &mut [u32] = arena.alloc::<u32>(4).unwrap();
646        let addr = s.as_ptr() as usize;
647        assert_eq!(addr % align_of::<u32>(), 0);
648        assert_eq!(s.len(), 4);
649    }
650
651    #[cfg(miri)]
652    #[test]
653    fn arena_alloc_can_return_misaligned_typed_slice() {
654        let pool = small_pool(1, 0);
655        let arena = pool.lease().unwrap();
656
657        // Memory-safety issue: the arena's backing allocation is a
658        // `Box<[u8]>`, so its base pointer is only guaranteed to be
659        // byte-aligned. `alloc::<T>` aligns only the byte offset, not
660        // the absolute address, and then constructs `&mut [T]`. The
661        // empty-buffer case makes this deterministic: even an empty
662        // `&mut [u32]` must have an aligned pointer, but `Box<[u8]>`
663        // uses an alignment-1 dangling pointer when its length is 0.
664        let _s: &mut [u32] = arena.alloc::<u32>(0).unwrap();
665    }
666
667    // Pre-fix this test was:
668    //
669    //     let values = arena.alloc::<std::num::NonZeroU8>(1).unwrap();
670    //     let _ = values[0].get();
671    //
672    // and failed under Miri because pool buffers are zero-filled and
673    // zero is not a valid `NonZeroU8`. Post-fix, the `Zeroable` bound
674    // on `Arena::alloc` makes that call a hard *compile* error — the
675    // strongest possible enforcement. The test below is a regression
676    // assertion that the bound stays as-or-stricter than `Zeroable`:
677    // if a future refactor weakened it back to `Copy`, the
678    // commented-out call site would start compiling again and Miri
679    // would once again accept the invalid bit pattern.
680    #[cfg(miri)]
681    #[test]
682    fn arena_alloc_allows_invalid_bit_patterns_for_copy_types() {
683        // `requires_zeroable::<NonZeroU8>()` would not compile —
684        // `NonZeroU8: !Zeroable`. Sanity-check the helper itself with
685        // a known zeroable type so the test is an actual exercise.
686        fn requires_zeroable<T: bytemuck::Zeroable>() {}
687        requires_zeroable::<u8>();
688        // Uncommenting the next line must fail to compile:
689        //   requires_zeroable::<std::num::NonZeroU8>();
690    }
691
692    #[cfg(miri)]
693    #[test]
694    fn arena_alloc_second_slice_invalidates_first_mut_reference() {
695        let pool = small_pool(1, 2);
696        let arena = pool.lease().unwrap();
697
698        // Memory-safety issue: each `alloc` calls `[u8]::as_mut_ptr` on
699        // the whole backing slice before carving out the requested
700        // subslice. That materializes a new mutable borrow of the whole
701        // buffer and invalidates previously returned `&mut` slices, even
702        // when the byte ranges are disjoint.
703        let first = arena.alloc::<u8>(1).unwrap();
704        let second = arena.alloc::<u8>(1).unwrap();
705        first[0] = 1;
706        second[0] = 2;
707    }
708
709    fn build_simple_frame(pool: &Arc<ArenaPool>) -> Frame {
710        let arena = pool.lease().unwrap();
711        let plane0: &mut [u8] = arena.alloc::<u8>(16).unwrap();
712        for (i, b) in plane0.iter_mut().enumerate() {
713            *b = i as u8;
714        }
715        let header = FrameHeader::new(4, 4, PixelFormat::Gray8, Some(42));
716        FrameInner::new(arena, &[(0, 16)], header).unwrap()
717    }
718
719    #[test]
720    fn frame_refcount_keeps_arena_alive() {
721        let pool = small_pool(1, 256);
722        let frame = build_simple_frame(&pool);
723        let clone = Arc::clone(&frame);
724        drop(frame);
725        let plane = clone.plane(0).expect("plane 0");
726        assert_eq!(plane.len(), 16);
727        for (i, b) in plane.iter().enumerate() {
728            assert_eq!(*b, i as u8);
729        }
730        assert_eq!(clone.header().width, 4);
731        assert_eq!(clone.header().height, 4);
732        assert_eq!(clone.header().presentation_timestamp, Some(42));
733        assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
734    }
735
736    #[test]
737    fn last_drop_returns_arena_to_pool() {
738        let pool = small_pool(1, 256);
739        let frame = build_simple_frame(&pool);
740        let clone = Arc::clone(&frame);
741        drop(frame);
742        drop(clone);
743        let _again = pool.lease().expect("lease after last drop");
744    }
745
746    #[test]
747    fn frame_rejects_too_many_planes() {
748        let pool = small_pool(1, 256);
749        let arena = pool.lease().unwrap();
750        let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
751        let too_many = vec![(0usize, 0usize); MAX_PLANES + 1];
752        let r = FrameInner::new(arena, &too_many, header);
753        assert!(matches!(r, Err(Error::InvalidData(_))));
754    }
755
756    #[test]
757    fn frame_rejects_plane_outside_arena() {
758        let pool = small_pool(1, 64);
759        let arena = pool.lease().unwrap();
760        let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
761        let r = FrameInner::new(arena, &[(0, 16)], header);
762        assert!(matches!(r, Err(Error::InvalidData(_))));
763    }
764
765    #[test]
766    fn pool_outlives_buffer_drop_when_pool_dropped_first() {
767        let pool = small_pool(1, 64);
768        let arena = pool.lease().unwrap();
769        drop(pool);
770        drop(arena);
771    }
772
773    #[test]
774    fn arena_reset_clears_allocations() {
775        let pool = small_pool(1, 32);
776        let mut arena = pool.lease().unwrap();
777        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
778        assert!(matches!(
779            arena.alloc::<u8>(1),
780            Err(Error::ResourceExhausted(_))
781        ));
782        arena.reset();
783        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
784    }
785
786    #[test]
787    fn frame_can_be_sent_across_thread_boundary() {
788        // Build a frame on this thread, ship it to a worker thread,
789        // read its bytes there. This is the use case the module
790        // exists to enable; if it ever stops compiling, the
791        // `Send + Sync` impls above are wrong.
792        let pool = small_pool(1, 256);
793        let frame = build_simple_frame(&pool);
794        let frame_for_worker = Arc::clone(&frame);
795        let handle = std::thread::spawn(move || {
796            let plane = frame_for_worker.plane(0).expect("plane 0 on worker");
797            let mut sum: u32 = 0;
798            for b in plane {
799                sum += *b as u32;
800            }
801            sum
802        });
803        let sum = handle.join().expect("worker joined");
804        // Plane was filled with 0..16, sum = 120.
805        assert_eq!(sum, (0..16u32).sum::<u32>());
806        // Original frame still readable here too.
807        assert_eq!(frame.plane(0).unwrap().len(), 16);
808    }
809
810    #[test]
811    fn concurrent_alloc_produces_disjoint_slices() {
812        // Two threads alloc 64 bytes each from a 256-byte arena.
813        // Their slices must not overlap.
814        let pool = small_pool(1, 256);
815        let arena = Arc::new(pool.lease().unwrap());
816        let a = Arc::clone(&arena);
817        let b = Arc::clone(&arena);
818        let h1 = std::thread::spawn(move || {
819            let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
820            // Fill so we can detect overlap from the other thread.
821            for x in s.iter_mut() {
822                *x = 0xAA;
823            }
824            (s.as_ptr() as usize, s.len())
825        });
826        let h2 = std::thread::spawn(move || {
827            let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
828            for x in s.iter_mut() {
829                *x = 0xBB;
830            }
831            (s.as_ptr() as usize, s.len())
832        });
833        let (p1, l1) = h1.join().unwrap();
834        let (p2, l2) = h2.join().unwrap();
835        // Disjoint ranges: [p1, p1+l1) and [p2, p2+l2) do not overlap.
836        let no_overlap = p1 + l1 <= p2 || p2 + l2 <= p1;
837        assert!(no_overlap, "concurrent alloc returned overlapping slices");
838    }
839
840    #[cfg(miri)]
841    #[test]
842    fn concurrent_alloc_retags_whole_buffer_while_other_thread_writes() {
843        // Memory-safety issue: `Arena` is `Sync`, so safe code can call
844        // `alloc` while another thread writes through a previously
845        // returned slice. The CAS cursor makes the byte ranges disjoint,
846        // but `alloc` still materializes a mutable borrow of the whole
847        // buffer via `[u8]::as_mut_ptr`; Miri reports that retag as a
848        // data race with the other thread's write.
849        let pool = small_pool(1, 256);
850        let arena = Arc::new(pool.lease().unwrap());
851        let barrier = Arc::new(std::sync::Barrier::new(2));
852        let a = Arc::clone(&arena);
853        let b = Arc::clone(&arena);
854        let barrier_a = Arc::clone(&barrier);
855        let barrier_b = Arc::clone(&barrier);
856        let h1 = std::thread::spawn(move || {
857            let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
858            barrier_a.wait();
859            for x in s.iter_mut() {
860                *x = 0xAA;
861            }
862        });
863        let h2 = std::thread::spawn(move || {
864            barrier_b.wait();
865            let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
866            for x in s.iter_mut() {
867                *x = 0xBB;
868            }
869        });
870        h1.join().unwrap();
871        h2.join().unwrap();
872    }
873}