Skip to main content

oxideav_core/arena/
sync.rs

1//! `Send + Sync` mirror of the parent [`crate::arena`] module.
2//!
3//! This module exposes the same four-type API ([`ArenaPool`],
4//! [`Arena`], [`Frame`], [`FrameInner`]) as its sibling, with one
5//! difference that ripples through the whole shape:
6//!
7//! - [`Arena`] uses `AtomicUsize` / `AtomicU32` for its bump cursor
8//!   and allocation counter (instead of `Cell<usize>` / `Cell<u32>`),
9//!   and is therefore `Send + Sync`.
10//! - [`Frame`] is `Arc<FrameInner>` (instead of `Rc<FrameInner>`),
11//!   so a decoded frame can be moved or shared across threads.
12//! - [`FrameInner`] holds a sync [`Arena`], so it is itself `Send +
13//!   Sync` and `Arc<FrameInner>: Send + Sync` falls out for free.
14//!
15//! ## When to use which
16//!
17//! Use [`crate::arena`] (the `Rc` variant) when the decoder produces
18//! frames on the same thread that consumes them. The bump cursor is
19//! a plain `Cell<usize>` and there are no atomic operations on the
20//! hot allocation path.
21//!
22//! Use this module (the `Arc` variant) when the decoder hands frames
23//! to a different thread — the typical case for a pipeline that
24//! decodes on one worker and renders / encodes / transmits on
25//! another. The cost is a relaxed atomic load + CAS per allocation
26//! and an atomic refcount per frame clone; both are negligible
27//! compared to the actual decode work.
28//!
29//! ## Concurrent allocation contract
30//!
31//! [`Arena::alloc`] uses a CAS loop on the cursor, so two threads
32//! that both call [`Arena::alloc`] on the same `&Arena` will receive
33//! disjoint slices (the loser of the CAS retries against the new
34//! cursor). The returned `&mut [T]` points into a region that no
35//! other in-flight `alloc()` call can also receive, and the slice's
36//! lifetime is bounded by the borrow of `&self`.
37//!
38//! In practice the typical pattern is **one decoder thread allocates,
39//! then freezes into a [`Frame`] which is shared read-only across
40//! threads** — concurrent allocation is supported but rarely useful.
41//! The bytes returned by [`Arena::alloc`] are not zero-initialised;
42//! callers must fully overwrite them before reading.
43//!
44//! Everything else (per-arena byte cap, per-arena allocation-count
45//! cap, weak handle back to the pool for `Drop`-time release, the
46//! `FrameHeader` shape, plane validation in [`FrameInner::new`])
47//! matches the parent module exactly.
48
49use std::cell::UnsafeCell;
50use std::mem::{align_of, size_of};
51use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
52use std::sync::{Arc, Mutex, Weak};
53
54use crate::error::{Error, Result};
55
56// Re-export the shared `FrameHeader` and `MAX_PLANES` constant so
57// users of either arena module see the same metadata shape — there is
58// no thread-safety angle to either of them, and duplicating them
59// would only add drift.
60pub use super::{FrameHeader, MAX_PLANES};
61
62/// `Send + Sync` pool of reusable byte buffers for arena-backed frame
63/// allocations. Mirrors [`crate::arena::ArenaPool`] in shape and
64/// behaviour; the only difference is that the [`Arena`] (and the
65/// [`Frame`] holding it) handed out are themselves `Send + Sync`.
66///
67/// Construct via [`ArenaPool::new`]. Lease an [`Arena`] per frame via
68/// [`ArenaPool::lease`]; drop the arena (or drop the last clone of a
69/// [`Frame`] holding it) to return its buffer to the pool.
70pub struct ArenaPool {
71    inner: Mutex<PoolInner>,
72    cap_per_arena: usize,
73    max_arenas: usize,
74    max_alloc_count_per_arena: u32,
75}
76
77struct PoolInner {
78    /// Buffers currently sitting idle in the pool (ready to lease).
79    idle: Vec<Box<[u8]>>,
80    /// Total buffers ever allocated by this pool (idle + in-flight).
81    /// Caps lazy growth at `max_arenas`.
82    total_allocated: usize,
83}
84
85impl ArenaPool {
86    /// Construct a new pool with `max_arenas` buffer slots, each of
87    /// `cap_per_arena` bytes. Buffers are allocated lazily on first
88    /// lease — a freshly constructed pool holds no memory.
89    ///
90    /// Per-arena allocation count is capped at a generous 1 M
91    /// (override via [`ArenaPool::with_alloc_count_cap`]).
92    pub fn new(max_arenas: usize, cap_per_arena: usize) -> Arc<Self> {
93        Self::with_alloc_count_cap(max_arenas, cap_per_arena, 1_000_000)
94    }
95
96    /// Like [`ArenaPool::new`] but lets the caller set the per-arena
97    /// allocation-count cap. Useful when the caller is plumbing
98    /// [`crate::DecoderLimits`] through.
99    pub fn with_alloc_count_cap(
100        max_arenas: usize,
101        cap_per_arena: usize,
102        max_alloc_count_per_arena: u32,
103    ) -> Arc<Self> {
104        Arc::new(Self {
105            inner: Mutex::new(PoolInner {
106                idle: Vec::with_capacity(max_arenas),
107                total_allocated: 0,
108            }),
109            cap_per_arena,
110            max_arenas,
111            max_alloc_count_per_arena,
112        })
113    }
114
115    /// Capacity of each arena buffer this pool hands out, in bytes.
116    pub fn cap_per_arena(&self) -> usize {
117        self.cap_per_arena
118    }
119
120    /// Maximum number of arenas that may be checked out at once.
121    pub fn max_arenas(&self) -> usize {
122        self.max_arenas
123    }
124
125    /// Lease one arena from the pool. Returns
126    /// [`Error::ResourceExhausted`] if every arena slot is already
127    /// checked out by an [`Arena`] (or a [`Frame`] holding one).
128    pub fn lease(self: &Arc<Self>) -> Result<Arena> {
129        let buffer = {
130            let mut inner = self.inner.lock().expect("ArenaPool mutex poisoned");
131            if let Some(buf) = inner.idle.pop() {
132                buf
133            } else if inner.total_allocated < self.max_arenas {
134                inner.total_allocated += 1;
135                vec![0u8; self.cap_per_arena].into_boxed_slice()
136            } else {
137                return Err(Error::resource_exhausted(format!(
138                    "ArenaPool exhausted: all {} arenas checked out",
139                    self.max_arenas
140                )));
141            }
142        };
143
144        Ok(Arena {
145            buffer: UnsafeCell::new(buffer),
146            cursor: AtomicUsize::new(0),
147            alloc_count: AtomicU32::new(0),
148            cap: self.cap_per_arena,
149            alloc_count_cap: self.max_alloc_count_per_arena,
150            pool: Arc::downgrade(self),
151        })
152    }
153
154    /// Return a buffer to the idle list. Called from `Arena::Drop`;
155    /// not part of the public API.
156    fn release(&self, buffer: Box<[u8]>) {
157        if let Ok(mut inner) = self.inner.lock() {
158            inner.idle.push(buffer);
159        }
160        // If the lock is poisoned, drop the buffer normally — the
161        // pool is in an unusable state already.
162    }
163}
164
165/// One leased buffer from a [`ArenaPool`]. `Send + Sync`.
166///
167/// Allocations are bump-pointer on an atomic cursor: each call to
168/// [`Arena::alloc`] CAS-advances the cursor and returns a fresh
169/// aligned slice carved out of the buffer at the old position. There
170/// is no per-allocation header and no individual free — the entire
171/// arena is reset (returned to the pool) only when the `Arena` is
172/// dropped.
173///
174/// Concurrent calls to [`Arena::alloc`] on the same `&Arena` are
175/// supported and produce disjoint slices (the CAS loser retries
176/// against the new cursor). See the module docs for the full
177/// concurrency contract.
178pub struct Arena {
179    /// Backing buffer leased from the pool.
180    ///
181    /// Wrapped in `UnsafeCell` because `&Arena::alloc` returns
182    /// `&mut [T]` slices that borrow into non-overlapping regions of
183    /// the same buffer. The atomic cursor below guarantees the
184    /// regions returned by successive (or concurrent) `alloc` calls
185    /// never overlap.
186    buffer: UnsafeCell<Box<[u8]>>,
187    /// Atomic bump cursor: the next free byte offset within `buffer`.
188    cursor: AtomicUsize,
189    /// Atomic allocation counter.
190    alloc_count: AtomicU32,
191    /// Cached cap (== `pool.cap_per_arena` at lease time).
192    cap: usize,
193    /// Cached cap (== `pool.max_alloc_count_per_arena` at lease time).
194    alloc_count_cap: u32,
195    /// Weak handle back to the pool so `Drop` can return the buffer.
196    pool: Weak<ArenaPool>,
197}
198
199// SAFETY: `Arena` owns its `Box<[u8]>` (no shared ownership of the
200// buffer with anything else) and all mutation goes through the atomic
201// cursor + the (correctly synchronised) `UnsafeCell`. The public
202// `alloc` API uses a CAS loop so concurrent allocators get disjoint
203// regions; the `Drop` path moves the buffer out under exclusive
204// access (`&mut self`). Sending the arena across threads is therefore
205// sound.
206unsafe impl Send for Arena {}
207// SAFETY: `&Arena::alloc` mutates only via the atomic cursor and the
208// allocation counter (themselves `Sync`) and writes into a region of
209// the `UnsafeCell` buffer that no other in-flight call has been
210// handed (CAS guarantees disjoint regions). Two threads holding
211// `&Arena` therefore cannot produce overlapping `&mut [T]` slices.
212unsafe impl Sync for Arena {}
213
214impl Arena {
215    /// Capacity of this arena in bytes.
216    pub fn capacity(&self) -> usize {
217        self.cap
218    }
219
220    /// Bytes consumed by allocations so far.
221    pub fn used(&self) -> usize {
222        self.cursor.load(Ordering::Acquire)
223    }
224
225    /// Number of allocations performed so far.
226    pub fn alloc_count(&self) -> u32 {
227        self.alloc_count.load(Ordering::Acquire)
228    }
229
230    /// `true` once the per-arena allocation-count cap has been
231    /// reached. Decoders that produce many small allocations should
232    /// poll this and bail with [`Error::ResourceExhausted`] when it
233    /// flips, instead of waiting for the next [`Arena::alloc`] call
234    /// to fail.
235    pub fn alloc_count_exceeded(&self) -> bool {
236        self.alloc_count.load(Ordering::Acquire) >= self.alloc_count_cap
237    }
238
239    /// Allocate `count` `T`s out of this arena. Returns a borrowed
240    /// `&mut [T]` (lifetime bounded by the borrow of `self`). The
241    /// bytes are not zero-initialised — the caller is responsible
242    /// for fully writing the returned slice before reading it.
243    ///
244    /// Returns [`Error::ResourceExhausted`] if either the per-arena
245    /// byte cap or the per-arena allocation-count cap would be
246    /// exceeded.
247    ///
248    /// # Safety / contract
249    ///
250    /// `T` must be a "plain old data" type with no `Drop` glue and
251    /// no invariants that need a constructor — typically `u8`, `i16`,
252    /// `u32`, `f32`, etc. The arena does not run destructors on
253    /// allocated values. This is enforced via a `T: Copy` bound.
254    ///
255    /// **Concurrency:** the bump cursor is advanced via a CAS loop,
256    /// so concurrent `alloc` calls on the same `&Arena` produce
257    /// disjoint slices. The CAS loser retries against the new
258    /// cursor; in the uncontended case the cost is a single relaxed
259    /// load plus one successful CAS.
260    #[allow(clippy::mut_from_ref)] // see "Concurrency" doc above.
261    pub fn alloc<T>(&self, count: usize) -> Result<&mut [T]>
262    where
263        T: Copy,
264    {
265        // Allocation-count cap. Increment first; if we overshoot,
266        // roll back so subsequent calls still see the correct value.
267        let prev_count = self.alloc_count.fetch_add(1, Ordering::AcqRel);
268        if prev_count >= self.alloc_count_cap {
269            // Roll back so `alloc_count_exceeded()` keeps returning
270            // a stable cap value rather than drifting upward.
271            self.alloc_count.fetch_sub(1, Ordering::AcqRel);
272            return Err(Error::resource_exhausted(format!(
273                "Arena alloc-count cap of {} exceeded",
274                self.alloc_count_cap
275            )));
276        }
277
278        let elem_size = size_of::<T>();
279        let elem_align = align_of::<T>();
280        // Bytes requested.
281        let bytes = elem_size.checked_mul(count).ok_or_else(|| {
282            // Roll back the alloc-count bump on size-overflow too.
283            self.alloc_count.fetch_sub(1, Ordering::AcqRel);
284            Error::resource_exhausted("Arena alloc size overflow".to_string())
285        })?;
286
287        // CAS loop on the cursor. We compute aligned + new_cursor
288        // from the latest observed cursor value, then attempt to
289        // claim that range; if another thread won the race, retry
290        // against the updated cursor.
291        let mut current = self.cursor.load(Ordering::Acquire);
292        let aligned;
293        let new_cursor;
294        loop {
295            let candidate_aligned = match align_up(current, elem_align) {
296                Some(a) => a,
297                None => {
298                    self.alloc_count.fetch_sub(1, Ordering::AcqRel);
299                    return Err(Error::resource_exhausted(
300                        "Arena cursor alignment overflow".to_string(),
301                    ));
302                }
303            };
304            let candidate_new = match candidate_aligned.checked_add(bytes) {
305                Some(n) => n,
306                None => {
307                    self.alloc_count.fetch_sub(1, Ordering::AcqRel);
308                    return Err(Error::resource_exhausted(
309                        "Arena cursor advance overflow".to_string(),
310                    ));
311                }
312            };
313
314            if candidate_new > self.cap {
315                self.alloc_count.fetch_sub(1, Ordering::AcqRel);
316                return Err(Error::resource_exhausted(format!(
317                    "Arena cap of {} bytes exceeded (would consume {} bytes)",
318                    self.cap, candidate_new
319                )));
320            }
321
322            match self.cursor.compare_exchange_weak(
323                current,
324                candidate_new,
325                Ordering::AcqRel,
326                Ordering::Acquire,
327            ) {
328                Ok(_) => {
329                    aligned = candidate_aligned;
330                    new_cursor = candidate_new;
331                    let _ = new_cursor; // silence "unused" if optimised
332                    break;
333                }
334                Err(observed) => {
335                    current = observed;
336                    // Retry with the freshly observed cursor.
337                }
338            }
339        }
340
341        // SAFETY: we have just CAS-claimed the byte range
342        // `aligned..aligned+bytes`. No other in-flight `alloc` call
343        // can claim any byte inside that range (the cursor is
344        // monotonically non-decreasing under successful CAS, so a
345        // subsequent winner observes a `current` >= our `new_cursor`
346        // and therefore proposes a `candidate_aligned` >=
347        // `new_cursor`). The buffer outlives the borrow of `&self`.
348        // T: Copy guarantees we don't need to drop previous contents.
349        let slice: &mut [T] = unsafe {
350            let buf_ptr = (*self.buffer.get()).as_mut_ptr();
351            let elem_ptr = buf_ptr.add(aligned).cast::<T>();
352            std::slice::from_raw_parts_mut(elem_ptr, count)
353        };
354
355        Ok(slice)
356    }
357
358    /// Reset the arena to empty without releasing its buffer to the
359    /// pool. Useful for a decoder that wants to reuse the same arena
360    /// across several intermediate stages of the same frame. Callers
361    /// must ensure no slice previously returned from [`Arena::alloc`]
362    /// is still in use — Rust's borrow checker enforces this, since
363    /// `reset` takes `&mut self`.
364    pub fn reset(&mut self) {
365        // `&mut self` proves exclusive access; non-atomic stores
366        // would suffice, but the atomic API is uniform.
367        self.cursor.store(0, Ordering::Release);
368        self.alloc_count.store(0, Ordering::Release);
369    }
370}
371
372impl Drop for Arena {
373    fn drop(&mut self) {
374        // Take the buffer out of the UnsafeCell. We're in Drop with
375        // `&mut self`, so no other references to it can exist.
376        let buffer = std::mem::replace(
377            unsafe { &mut *self.buffer.get() },
378            Vec::new().into_boxed_slice(),
379        );
380        if let Some(pool) = self.pool.upgrade() {
381            pool.release(buffer);
382        }
383        // else: pool was dropped before us — buffer drops here.
384    }
385}
386
387/// Round `n` up to the next multiple of `align`. `align` must be a
388/// power of two. Returns `None` on overflow.
389fn align_up(n: usize, align: usize) -> Option<usize> {
390    debug_assert!(align.is_power_of_two(), "alignment must be a power of two");
391    let mask = align - 1;
392    n.checked_add(mask).map(|m| m & !mask)
393}
394
395/// The owned body of a refcounted [`Frame`]. `Send + Sync`.
396///
397/// Holds a [`sync::Arena`](Arena) (the bytes), a fixed-size table of
398/// `(offset_in_arena, length_in_bytes)` pairs (one per plane), and a
399/// [`FrameHeader`]. The `plane_count` field tracks how many entries
400/// of `plane_offsets` are actually populated. Up to [`MAX_PLANES`]
401/// planes are supported.
402///
403/// **Lifetime:** an [`Arena`] returns its buffer to the pool when
404/// dropped. An `Arc<FrameInner>` keeps the arena alive via its single
405/// owned field, so as long as any clone of a [`Frame`] exists the
406/// underlying buffer stays out of the pool.
407pub struct FrameInner {
408    arena: Arena,
409    plane_offsets: [(usize, usize); MAX_PLANES],
410    plane_count: u8,
411    header: FrameHeader,
412}
413
414/// Refcounted handle to a decoded video frame. `Send + Sync`.
415///
416/// Construct via [`FrameInner::new`]; clone freely (each clone bumps
417/// the atomic refcount by 1). The arena and its buffer are released
418/// back to the pool when the last clone is dropped.
419///
420/// Use this type when the decoder hands frames to a different thread
421/// from the one that produced them. For same-thread decode/consume,
422/// the cheaper [`crate::arena::Frame`] (`Rc`-backed) is preferable.
423pub type Frame = Arc<FrameInner>;
424
425impl FrameInner {
426    /// Construct a `Frame` (`Arc<FrameInner>`) from an arena, a slice
427    /// of `(offset, length)` plane descriptors, and a header. Returns
428    /// [`Error::InvalidData`] if more than [`MAX_PLANES`] planes are
429    /// supplied or if any plane range falls outside the arena's used
430    /// region.
431    pub fn new(arena: Arena, planes: &[(usize, usize)], header: FrameHeader) -> Result<Frame> {
432        if planes.len() > MAX_PLANES {
433            return Err(Error::invalid(format!(
434                "FrameInner supports at most {} planes (got {})",
435                MAX_PLANES,
436                planes.len()
437            )));
438        }
439        let used = arena.used();
440        for (i, (off, len)) in planes.iter().enumerate() {
441            let end = off
442                .checked_add(*len)
443                .ok_or_else(|| Error::invalid(format!("plane {i}: offset+len overflow")))?;
444            if end > used {
445                return Err(Error::invalid(format!(
446                    "plane {i}: range {off}..{end} exceeds arena used={used}"
447                )));
448            }
449        }
450        let mut plane_offsets = [(0usize, 0usize); MAX_PLANES];
451        for (i, p) in planes.iter().enumerate() {
452            plane_offsets[i] = *p;
453        }
454        Ok(Arc::new(FrameInner {
455            arena,
456            plane_offsets,
457            plane_count: planes.len() as u8,
458            header,
459        }))
460    }
461
462    /// Number of planes this frame holds.
463    pub fn plane_count(&self) -> usize {
464        self.plane_count as usize
465    }
466
467    /// Read-only access to plane `i`. Returns `None` if `i` is out of
468    /// range.
469    pub fn plane(&self, i: usize) -> Option<&[u8]> {
470        if i >= self.plane_count as usize {
471            return None;
472        }
473        let (off, len) = self.plane_offsets[i];
474        // SAFETY: plane ranges were validated against `arena.used()`
475        // at construction; no allocation has touched bytes inside any
476        // already-handed-out region (cursor only advances). We borrow
477        // with the lifetime of `&self`.
478        let buf: &[u8] = unsafe {
479            let buf_ref = &*self.arena.buffer.get();
480            &(**buf_ref)[off..off + len]
481        };
482        Some(buf)
483    }
484
485    /// Frame header (width / height / pixel format / pts).
486    pub fn header(&self) -> &FrameHeader {
487        &self.header
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use crate::format::PixelFormat;
495
496    fn assert_send_sync<T: Send + Sync>() {}
497
498    #[test]
499    fn types_are_send_sync() {
500        // The whole point of this module: prove the public types
501        // satisfy the cross-thread contract that `crate::arena` does
502        // not.
503        assert_send_sync::<ArenaPool>();
504        assert_send_sync::<Arc<ArenaPool>>();
505        assert_send_sync::<Arena>();
506        assert_send_sync::<FrameInner>();
507        assert_send_sync::<Frame>();
508    }
509
510    fn small_pool(slots: usize, cap: usize) -> Arc<ArenaPool> {
511        ArenaPool::new(slots, cap)
512    }
513
514    #[test]
515    fn pool_lease_returns_err_when_exhausted() {
516        let pool = small_pool(2, 1024);
517        let a = pool.lease().expect("first lease");
518        let b = pool.lease().expect("second lease");
519        let third = pool.lease();
520        assert!(matches!(third, Err(Error::ResourceExhausted(_))));
521        drop((a, b));
522    }
523
524    #[test]
525    fn arena_alloc_caps_at_size_limit() {
526        let pool = small_pool(1, 64);
527        let arena = pool.lease().unwrap();
528        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
529        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
530        let third = arena.alloc::<u8>(1);
531        assert!(matches!(third, Err(Error::ResourceExhausted(_))));
532    }
533
534    #[test]
535    fn arena_alloc_count_cap_fires() {
536        let pool = ArenaPool::with_alloc_count_cap(1, 1024, 3);
537        let arena = pool.lease().unwrap();
538        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
539        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
540        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
541        assert!(arena.alloc_count_exceeded());
542        let fourth = arena.alloc::<u8>(1);
543        assert!(matches!(fourth, Err(Error::ResourceExhausted(_))));
544        // Counter must remain at the cap even after a refused alloc
545        // — no drift from the rollback path.
546        assert_eq!(arena.alloc_count(), 3);
547    }
548
549    #[test]
550    fn arena_returns_to_pool_on_drop() {
551        let pool = small_pool(1, 256);
552        {
553            let arena = pool.lease().expect("first lease");
554            assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
555            drop(arena);
556        }
557        let _again = pool.lease().expect("re-lease after drop");
558    }
559
560    #[test]
561    fn arena_alignment_is_respected() {
562        let pool = small_pool(1, 64);
563        let arena = pool.lease().unwrap();
564        let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
565        let s: &mut [u32] = arena.alloc::<u32>(4).unwrap();
566        let addr = s.as_ptr() as usize;
567        assert_eq!(addr % align_of::<u32>(), 0);
568        assert_eq!(s.len(), 4);
569    }
570
571    fn build_simple_frame(pool: &Arc<ArenaPool>) -> Frame {
572        let arena = pool.lease().unwrap();
573        let plane0: &mut [u8] = arena.alloc::<u8>(16).unwrap();
574        for (i, b) in plane0.iter_mut().enumerate() {
575            *b = i as u8;
576        }
577        let header = FrameHeader::new(4, 4, PixelFormat::Gray8, Some(42));
578        FrameInner::new(arena, &[(0, 16)], header).unwrap()
579    }
580
581    #[test]
582    fn frame_refcount_keeps_arena_alive() {
583        let pool = small_pool(1, 256);
584        let frame = build_simple_frame(&pool);
585        let clone = Arc::clone(&frame);
586        drop(frame);
587        let plane = clone.plane(0).expect("plane 0");
588        assert_eq!(plane.len(), 16);
589        for (i, b) in plane.iter().enumerate() {
590            assert_eq!(*b, i as u8);
591        }
592        assert_eq!(clone.header().width, 4);
593        assert_eq!(clone.header().height, 4);
594        assert_eq!(clone.header().presentation_timestamp, Some(42));
595        assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
596    }
597
598    #[test]
599    fn last_drop_returns_arena_to_pool() {
600        let pool = small_pool(1, 256);
601        let frame = build_simple_frame(&pool);
602        let clone = Arc::clone(&frame);
603        drop(frame);
604        drop(clone);
605        let _again = pool.lease().expect("lease after last drop");
606    }
607
608    #[test]
609    fn frame_rejects_too_many_planes() {
610        let pool = small_pool(1, 256);
611        let arena = pool.lease().unwrap();
612        let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
613        let too_many = vec![(0usize, 0usize); MAX_PLANES + 1];
614        let r = FrameInner::new(arena, &too_many, header);
615        assert!(matches!(r, Err(Error::InvalidData(_))));
616    }
617
618    #[test]
619    fn frame_rejects_plane_outside_arena() {
620        let pool = small_pool(1, 64);
621        let arena = pool.lease().unwrap();
622        let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
623        let r = FrameInner::new(arena, &[(0, 16)], header);
624        assert!(matches!(r, Err(Error::InvalidData(_))));
625    }
626
627    #[test]
628    fn pool_outlives_buffer_drop_when_pool_dropped_first() {
629        let pool = small_pool(1, 64);
630        let arena = pool.lease().unwrap();
631        drop(pool);
632        drop(arena);
633    }
634
635    #[test]
636    fn arena_reset_clears_allocations() {
637        let pool = small_pool(1, 32);
638        let mut arena = pool.lease().unwrap();
639        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
640        assert!(matches!(
641            arena.alloc::<u8>(1),
642            Err(Error::ResourceExhausted(_))
643        ));
644        arena.reset();
645        let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
646    }
647
648    #[test]
649    fn frame_can_be_sent_across_thread_boundary() {
650        // Build a frame on this thread, ship it to a worker thread,
651        // read its bytes there. This is the use case the module
652        // exists to enable; if it ever stops compiling, the
653        // `Send + Sync` impls above are wrong.
654        let pool = small_pool(1, 256);
655        let frame = build_simple_frame(&pool);
656        let frame_for_worker = Arc::clone(&frame);
657        let handle = std::thread::spawn(move || {
658            let plane = frame_for_worker.plane(0).expect("plane 0 on worker");
659            let mut sum: u32 = 0;
660            for b in plane {
661                sum += *b as u32;
662            }
663            sum
664        });
665        let sum = handle.join().expect("worker joined");
666        // Plane was filled with 0..16, sum = 120.
667        assert_eq!(sum, (0..16u32).sum::<u32>());
668        // Original frame still readable here too.
669        assert_eq!(frame.plane(0).unwrap().len(), 16);
670    }
671
672    #[test]
673    fn concurrent_alloc_produces_disjoint_slices() {
674        // Two threads alloc 64 bytes each from a 256-byte arena.
675        // Their slices must not overlap.
676        let pool = small_pool(1, 256);
677        let arena = Arc::new(pool.lease().unwrap());
678        let a = Arc::clone(&arena);
679        let b = Arc::clone(&arena);
680        let h1 = std::thread::spawn(move || {
681            let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
682            // Fill so we can detect overlap from the other thread.
683            for x in s.iter_mut() {
684                *x = 0xAA;
685            }
686            (s.as_ptr() as usize, s.len())
687        });
688        let h2 = std::thread::spawn(move || {
689            let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
690            for x in s.iter_mut() {
691                *x = 0xBB;
692            }
693            (s.as_ptr() as usize, s.len())
694        });
695        let (p1, l1) = h1.join().unwrap();
696        let (p2, l2) = h2.join().unwrap();
697        // Disjoint ranges: [p1, p1+l1) and [p2, p2+l2) do not overlap.
698        let no_overlap = p1 + l1 <= p2 || p2 + l2 <= p1;
699        assert!(no_overlap, "concurrent alloc returned overlapping slices");
700    }
701}