oxideav_core/arena/sync.rs
1//! `Send + Sync` mirror of the parent [`crate::arena`] module.
2//!
3//! This module exposes the same four-type API ([`ArenaPool`],
4//! [`Arena`], [`Frame`], [`FrameInner`]) as its sibling, with one
5//! difference that ripples through the whole shape:
6//!
7//! - [`Arena`] uses `AtomicUsize` / `AtomicU32` for its bump cursor
8//! and allocation counter (instead of `Cell<usize>` / `Cell<u32>`),
9//! and is therefore `Send + Sync`.
10//! - [`Frame`] is `Arc<FrameInner>` (instead of `Rc<FrameInner>`),
11//! so a decoded frame can be moved or shared across threads.
12//! - [`FrameInner`] holds a sync [`Arena`], so it is itself `Send +
13//! Sync` and `Arc<FrameInner>: Send + Sync` falls out for free.
14//!
15//! ## When to use which
16//!
17//! Use [`crate::arena`] (the `Rc` variant) when the decoder produces
18//! frames on the same thread that consumes them. The bump cursor is
19//! a plain `Cell<usize>` and there are no atomic operations on the
20//! hot allocation path.
21//!
22//! Use this module (the `Arc` variant) when the decoder hands frames
23//! to a different thread — the typical case for a pipeline that
24//! decodes on one worker and renders / encodes / transmits on
25//! another. The cost is a relaxed atomic load + CAS per allocation
26//! and an atomic refcount per frame clone; both are negligible
27//! compared to the actual decode work.
28//!
29//! ## Concurrent allocation contract
30//!
31//! [`Arena::alloc`] uses a CAS loop on the cursor, so two threads
32//! that both call [`Arena::alloc`] on the same `&Arena` will receive
33//! disjoint slices (the loser of the CAS retries against the new
34//! cursor). The returned `&mut [T]` points into a region that no
35//! other in-flight `alloc()` call can also receive, and the slice's
36//! lifetime is bounded by the borrow of `&self`.
37//!
38//! In practice the typical pattern is **one decoder thread allocates,
39//! then freezes into a [`Frame`] which is shared read-only across
40//! threads** — concurrent allocation is supported but rarely useful.
41//! The bytes returned by [`Arena::alloc`] are not zero-initialised;
42//! callers must fully overwrite them before reading.
43//!
44//! Everything else (per-arena byte cap, per-arena allocation-count
45//! cap, weak handle back to the pool for `Drop`-time release, the
46//! `FrameHeader` shape, plane validation in [`FrameInner::new`])
47//! matches the parent module exactly.
48
49use std::mem::{align_of, size_of};
50use std::ptr::NonNull;
51use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
52use std::sync::{Arc, Mutex, Weak};
53
54use crate::error::{Error, Result};
55
56// Re-export the shared `FrameHeader` and `MAX_PLANES` constant so
57// users of either arena module see the same metadata shape — there is
58// no thread-safety angle to either of them, and duplicating them
59// would only add drift.
60pub use super::{FrameHeader, MAX_PLANES};
61// `Buffer` and `MAX_ALIGN` are shared with the parent module — the
62// pool-backing storage and the soundness-critical alignment constant
63// are identical for the `Rc` and `Arc` variants. See `arena/mod.rs`
64// for the full soundness rationale.
65use super::{Buffer, MAX_ALIGN};
66
67/// `Send + Sync` pool of reusable byte buffers for arena-backed frame
68/// allocations. Mirrors [`crate::arena::ArenaPool`] in shape and
69/// behaviour; the only difference is that the [`Arena`] (and the
70/// [`Frame`] holding it) handed out are themselves `Send + Sync`.
71///
72/// Construct via [`ArenaPool::new`]. Lease an [`Arena`] per frame via
73/// [`ArenaPool::lease`]; drop the arena (or drop the last clone of a
74/// [`Frame`] holding it) to return its buffer to the pool.
75pub struct ArenaPool {
76 inner: Mutex<PoolInner>,
77 cap_per_arena: usize,
78 max_arenas: usize,
79 max_alloc_count_per_arena: u32,
80}
81
82struct PoolInner {
83 /// Buffers currently sitting idle in the pool (ready to lease).
84 idle: Vec<Buffer>,
85 /// Total buffers ever allocated by this pool (idle + in-flight).
86 /// Caps lazy growth at `max_arenas`.
87 total_allocated: usize,
88}
89
90impl ArenaPool {
91 /// Construct a new pool with `max_arenas` buffer slots, each of
92 /// `cap_per_arena` bytes. Buffers are allocated lazily on first
93 /// lease — a freshly constructed pool holds no memory.
94 ///
95 /// Per-arena allocation count is capped at a generous 1 M
96 /// (override via [`ArenaPool::with_alloc_count_cap`]).
97 pub fn new(max_arenas: usize, cap_per_arena: usize) -> Arc<Self> {
98 Self::with_alloc_count_cap(max_arenas, cap_per_arena, 1_000_000)
99 }
100
101 /// Like [`ArenaPool::new`] but lets the caller set the per-arena
102 /// allocation-count cap. Useful when the caller is plumbing
103 /// [`crate::DecoderLimits`] through.
104 pub fn with_alloc_count_cap(
105 max_arenas: usize,
106 cap_per_arena: usize,
107 max_alloc_count_per_arena: u32,
108 ) -> Arc<Self> {
109 Arc::new(Self {
110 inner: Mutex::new(PoolInner {
111 idle: Vec::with_capacity(max_arenas),
112 total_allocated: 0,
113 }),
114 cap_per_arena,
115 max_arenas,
116 max_alloc_count_per_arena,
117 })
118 }
119
120 /// Capacity of each arena buffer this pool hands out, in bytes.
121 pub fn cap_per_arena(&self) -> usize {
122 self.cap_per_arena
123 }
124
125 /// Maximum number of arenas that may be checked out at once.
126 pub fn max_arenas(&self) -> usize {
127 self.max_arenas
128 }
129
130 /// Lease one arena from the pool. Returns
131 /// [`Error::ResourceExhausted`] if every arena slot is already
132 /// checked out by an [`Arena`] (or a [`Frame`] holding one).
133 pub fn lease(self: &Arc<Self>) -> Result<Arena> {
134 let buffer = {
135 let mut inner = self.inner.lock().expect("ArenaPool mutex poisoned");
136 if let Some(buf) = inner.idle.pop() {
137 buf
138 } else if inner.total_allocated < self.max_arenas {
139 inner.total_allocated += 1;
140 Buffer::new_zeroed(self.cap_per_arena)
141 } else {
142 return Err(Error::resource_exhausted(format!(
143 "ArenaPool exhausted: all {} arenas checked out",
144 self.max_arenas
145 )));
146 }
147 };
148
149 let base = buffer.ptr;
150 Ok(Arena {
151 buffer: Mutex::new(Some(buffer)),
152 base,
153 cursor: AtomicUsize::new(0),
154 alloc_count: AtomicU32::new(0),
155 cap: self.cap_per_arena,
156 alloc_count_cap: self.max_alloc_count_per_arena,
157 pool: Arc::downgrade(self),
158 })
159 }
160
161 /// Return a buffer to the idle list. Called from `Arena::Drop`;
162 /// not part of the public API. The buffer is zeroed before being
163 /// returned so the next lease starts from a clean state — this is
164 /// what makes `Zeroable` a sufficient bound on `Arena::alloc<T>`
165 /// across pool reuse cycles.
166 fn release(&self, mut buffer: Buffer) {
167 buffer.zero();
168 if let Ok(mut inner) = self.inner.lock() {
169 inner.idle.push(buffer);
170 }
171 // If the lock is poisoned, drop the buffer normally — the
172 // pool is in an unusable state already.
173 }
174}
175
176/// One leased buffer from a [`ArenaPool`]. `Send + Sync`.
177///
178/// Allocations are bump-pointer on an atomic cursor: each call to
179/// [`Arena::alloc`] CAS-advances the cursor and returns a fresh
180/// aligned slice carved out of the buffer at the old position. There
181/// is no per-allocation header and no individual free — the entire
182/// arena is reset (returned to the pool) only when the `Arena` is
183/// dropped.
184///
185/// Concurrent calls to [`Arena::alloc`] on the same `&Arena` are
186/// supported and produce disjoint slices (the CAS loser retries
187/// against the new cursor). See the module docs for the full
188/// concurrency contract.
189pub struct Arena {
190 /// Backing buffer leased from the pool. Stored in a `Mutex` so
191 /// `Drop` can `take()` the buffer without needing direct
192 /// `UnsafeCell` access (which would re-borrow the whole storage
193 /// and invalidate previously-returned slices under stacked
194 /// borrows). Outside of `Drop` this is always `Some` — the
195 /// mutex itself is essentially uncontended (nothing else touches
196 /// it on the hot path).
197 ///
198 /// We never re-borrow this buffer mutably while handing out
199 /// slices from it — the typed pointers returned by `alloc` are
200 /// derived from the cached raw `base` pointer below, never from
201 /// a fresh borrow of the whole storage. This is what avoids the
202 /// stacked-borrows whole-buffer-retag race that Miri reported
203 /// when two threads called `alloc` concurrently while a third
204 /// held a previously-returned `&mut [T]`.
205 buffer: Mutex<Option<Buffer>>,
206 /// Cached base pointer of `buffer` (a [`MAX_ALIGN`]-aligned
207 /// allocation owned by `buffer`). Stable for the lifetime of the
208 /// arena: `Buffer` does not move its allocation, and we only take
209 /// `buffer` out of the mutex during `Drop` after no allocator
210 /// activity remains. All `alloc` calls derive their typed
211 /// pointers from `base.as_ptr().add(offset)`.
212 base: NonNull<u8>,
213 /// Atomic bump cursor: the next free byte offset within the
214 /// buffer.
215 cursor: AtomicUsize,
216 /// Atomic allocation counter.
217 alloc_count: AtomicU32,
218 /// Cached cap (== `pool.cap_per_arena` at lease time).
219 cap: usize,
220 /// Cached cap (== `pool.max_alloc_count_per_arena` at lease time).
221 alloc_count_cap: u32,
222 /// Weak handle back to the pool so `Drop` can return the buffer.
223 pool: Weak<ArenaPool>,
224}
225
226// SAFETY: `Arena` owns its buffer's allocation outright (no shared
227// ownership), all cursor/count mutations go through atomics, and the
228// raw `base` pointer is only used to derive disjoint typed slices
229// whose ranges the CAS loop guarantees not to overlap. The `Drop`
230// path takes the buffer out of the mutex under `&mut self` — no
231// other thread can be in `alloc` at that point.
232unsafe impl Send for Arena {}
233// SAFETY: `&Arena::alloc` mutates only via the atomic cursor and the
234// allocation counter (themselves `Sync`) and writes into a region of
235// the buffer that no other in-flight call has been handed (CAS
236// guarantees disjoint regions). The raw `base` pointer is never used
237// to materialise a whole-buffer mutable borrow, so a new `alloc`
238// call cannot invalidate any other thread's previously returned
239// `&mut [T]` slice under stacked borrows.
240unsafe impl Sync for Arena {}
241
242impl Arena {
243 /// Capacity of this arena in bytes.
244 pub fn capacity(&self) -> usize {
245 self.cap
246 }
247
248 /// Bytes consumed by allocations so far.
249 pub fn used(&self) -> usize {
250 self.cursor.load(Ordering::Acquire)
251 }
252
253 /// Number of allocations performed so far.
254 pub fn alloc_count(&self) -> u32 {
255 self.alloc_count.load(Ordering::Acquire)
256 }
257
258 /// `true` once the per-arena allocation-count cap has been
259 /// reached. Decoders that produce many small allocations should
260 /// poll this and bail with [`Error::ResourceExhausted`] when it
261 /// flips, instead of waiting for the next [`Arena::alloc`] call
262 /// to fail.
263 pub fn alloc_count_exceeded(&self) -> bool {
264 self.alloc_count.load(Ordering::Acquire) >= self.alloc_count_cap
265 }
266
267 /// Allocate `count` `T`s out of this arena. Returns a borrowed
268 /// `&mut [T]` (lifetime bounded by the borrow of `self`).
269 ///
270 /// The returned slice points at zero-filled bytes (the pool
271 /// zero-fills on initial allocation and again whenever a buffer
272 /// is returned). The `Zeroable` bound on `T` guarantees that an
273 /// all-zero bit pattern is a valid value for `T`, so reading the
274 /// slice without first writing it is sound. The intended pattern
275 /// is still "decoder fills the slice, then reads back what it
276 /// wrote" — but unwritten bytes will read back as `T::zeroed()`
277 /// rather than as UB.
278 ///
279 /// Returns [`Error::ResourceExhausted`] if either the per-arena
280 /// byte cap or the per-arena allocation-count cap would be
281 /// exceeded.
282 ///
283 /// # Type bounds
284 ///
285 /// - `T: bytemuck::Zeroable` — pool buffers are zero-filled, so
286 /// handing back `&mut [T]` over those bytes is only sound when
287 /// the all-zero bit pattern is valid for `T`. This rules out
288 /// `NonZeroU8`/`NonZeroU16`/…/references/function pointers/
289 /// niche-optimised enums.
290 /// - `align_of::<T>() <= MAX_ALIGN` — checked at compile time via
291 /// a `const` assertion. The pool buffer's base pointer is
292 /// aligned to [`MAX_ALIGN`] (= 64 bytes); per-`T` alignment is
293 /// then a relative-offset adjustment of the bump cursor.
294 /// - The arena does not run destructors on allocated values, so
295 /// `T` should not have meaningful `Drop` glue.
296 ///
297 /// **Concurrency:** the bump cursor is advanced via a CAS loop,
298 /// so concurrent `alloc` calls on the same `&Arena` produce
299 /// disjoint slices. The CAS loser retries against the new
300 /// cursor; in the uncontended case the cost is a single relaxed
301 /// load plus one successful CAS. Crucially, no `alloc` call
302 /// re-borrows the whole buffer (the typed pointer is derived
303 /// from the cached raw base pointer), so concurrent allocators
304 /// cannot invalidate each other's previously-returned slices
305 /// under stacked borrows.
306 #[allow(clippy::mut_from_ref)] // see "Concurrency" doc above.
307 pub fn alloc<T>(&self, count: usize) -> Result<&mut [T]>
308 where
309 T: bytemuck::Zeroable,
310 {
311 // Compile-time check: T's alignment must not exceed the
312 // pool buffer's base alignment. Doing this as a const-eval'd
313 // assert means a violating monomorphisation fails the build.
314 const fn assert_align<T>() {
315 assert!(
316 align_of::<T>() <= MAX_ALIGN,
317 "Arena::alloc<T>: align_of::<T>() exceeds MAX_ALIGN; \
318 increase MAX_ALIGN in arena/mod.rs"
319 );
320 }
321 const { assert_align::<T>() };
322
323 // Allocation-count cap. Increment first; if we overshoot,
324 // roll back so subsequent calls still see the correct value.
325 let prev_count = self.alloc_count.fetch_add(1, Ordering::AcqRel);
326 if prev_count >= self.alloc_count_cap {
327 // Roll back so `alloc_count_exceeded()` keeps returning
328 // a stable cap value rather than drifting upward.
329 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
330 return Err(Error::resource_exhausted(format!(
331 "Arena alloc-count cap of {} exceeded",
332 self.alloc_count_cap
333 )));
334 }
335
336 let elem_size = size_of::<T>();
337 let elem_align = align_of::<T>();
338 // Bytes requested.
339 let bytes = elem_size.checked_mul(count).ok_or_else(|| {
340 // Roll back the alloc-count bump on size-overflow too.
341 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
342 Error::resource_exhausted("Arena alloc size overflow".to_string())
343 })?;
344
345 // CAS loop on the cursor. We compute aligned + new_cursor
346 // from the latest observed cursor value, then attempt to
347 // claim that range; if another thread won the race, retry
348 // against the updated cursor.
349 let mut current = self.cursor.load(Ordering::Acquire);
350 let aligned;
351 let new_cursor;
352 loop {
353 let candidate_aligned = match align_up(current, elem_align) {
354 Some(a) => a,
355 None => {
356 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
357 return Err(Error::resource_exhausted(
358 "Arena cursor alignment overflow".to_string(),
359 ));
360 }
361 };
362 let candidate_new = match candidate_aligned.checked_add(bytes) {
363 Some(n) => n,
364 None => {
365 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
366 return Err(Error::resource_exhausted(
367 "Arena cursor advance overflow".to_string(),
368 ));
369 }
370 };
371
372 if candidate_new > self.cap {
373 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
374 return Err(Error::resource_exhausted(format!(
375 "Arena cap of {} bytes exceeded (would consume {} bytes)",
376 self.cap, candidate_new
377 )));
378 }
379
380 match self.cursor.compare_exchange_weak(
381 current,
382 candidate_new,
383 Ordering::AcqRel,
384 Ordering::Acquire,
385 ) {
386 Ok(_) => {
387 aligned = candidate_aligned;
388 new_cursor = candidate_new;
389 let _ = new_cursor; // silence "unused" if optimised
390 break;
391 }
392 Err(observed) => {
393 current = observed;
394 // Retry with the freshly observed cursor.
395 }
396 }
397 }
398
399 // SAFETY:
400 //
401 // - `self.base` points to a `MAX_ALIGN`-aligned allocation of
402 // `self.cap` bytes owned by the `Buffer` inside
403 // `self.buffer`, which lives at least as long as `&self`.
404 // - We just CAS-claimed the byte range `aligned..new_cursor`,
405 // so no other in-flight `alloc` call can claim any byte
406 // inside it (the cursor is monotonically non-decreasing
407 // under successful CAS, so a subsequent winner observes a
408 // `current` >= our `new_cursor`).
409 // - `aligned + count*size_of::<T>() <= self.cap` (just checked
410 // above), so the byte range is in-bounds of the allocation.
411 // - `aligned` is a multiple of `align_of::<T>()` and `MAX_ALIGN
412 // >= align_of::<T>()` (compile-time assert above), so `base
413 // + aligned` is `T`-aligned (true even for `count == 0`).
414 // - We derive the typed pointer from `self.base.as_ptr()`, not
415 // from a fresh borrow of the whole buffer, so this slice
416 // does not invalidate any other thread's previously
417 // returned `&mut [T]` under stacked borrows.
418 // - `T: Zeroable` and the buffer bytes are zero, so the
419 // `&mut [T]` references valid `T` values.
420 let slice: &mut [T] = unsafe {
421 let elem_ptr = self.base.as_ptr().add(aligned).cast::<T>();
422 std::slice::from_raw_parts_mut(elem_ptr, count)
423 };
424
425 Ok(slice)
426 }
427
428 /// Reset the arena to empty without releasing its buffer to the
429 /// pool. Useful for a decoder that wants to reuse the same arena
430 /// across several intermediate stages of the same frame. Callers
431 /// must ensure no slice previously returned from [`Arena::alloc`]
432 /// is still in use — Rust's borrow checker enforces this, since
433 /// `reset` takes `&mut self`.
434 pub fn reset(&mut self) {
435 // `&mut self` proves exclusive access; non-atomic stores
436 // would suffice, but the atomic API is uniform.
437 self.cursor.store(0, Ordering::Release);
438 self.alloc_count.store(0, Ordering::Release);
439 }
440}
441
442impl Drop for Arena {
443 fn drop(&mut self) {
444 // We're in Drop with `&mut self`, so no `alloc`-returned
445 // slices can still be borrowing from `base` and no other
446 // thread can be in `alloc`. Take the buffer out of the mutex
447 // and either return it to the pool or let it free here.
448 let taken = self.buffer.get_mut().ok().and_then(|slot| slot.take());
449 if let Some(buffer) = taken {
450 if let Some(pool) = self.pool.upgrade() {
451 pool.release(buffer);
452 } else {
453 // Pool was dropped before us — buffer drops here and
454 // its allocation is freed via `Buffer::Drop`.
455 drop(buffer);
456 }
457 }
458 }
459}
460
461/// Round `n` up to the next multiple of `align`. `align` must be a
462/// power of two. Returns `None` on overflow.
463fn align_up(n: usize, align: usize) -> Option<usize> {
464 debug_assert!(align.is_power_of_two(), "alignment must be a power of two");
465 let mask = align - 1;
466 n.checked_add(mask).map(|m| m & !mask)
467}
468
469/// The owned body of a refcounted [`Frame`]. `Send + Sync`.
470///
471/// Holds a [`sync::Arena`](Arena) (the bytes), a fixed-size table of
472/// `(offset_in_arena, length_in_bytes)` pairs (one per plane), and a
473/// [`FrameHeader`]. The `plane_count` field tracks how many entries
474/// of `plane_offsets` are actually populated. Up to [`MAX_PLANES`]
475/// planes are supported.
476///
477/// **Lifetime:** an [`Arena`] returns its buffer to the pool when
478/// dropped. An `Arc<FrameInner>` keeps the arena alive via its single
479/// owned field, so as long as any clone of a [`Frame`] exists the
480/// underlying buffer stays out of the pool.
481pub struct FrameInner {
482 arena: Arena,
483 plane_offsets: [(usize, usize); MAX_PLANES],
484 plane_count: u8,
485 header: FrameHeader,
486}
487
488/// Refcounted handle to a decoded video frame. `Send + Sync`.
489///
490/// Construct via [`FrameInner::new`]; clone freely (each clone bumps
491/// the atomic refcount by 1). The arena and its buffer are released
492/// back to the pool when the last clone is dropped.
493///
494/// Use this type when the decoder hands frames to a different thread
495/// from the one that produced them. For same-thread decode/consume,
496/// the cheaper [`crate::arena::Frame`] (`Rc`-backed) is preferable.
497pub type Frame = Arc<FrameInner>;
498
499impl FrameInner {
500 /// Construct a `Frame` (`Arc<FrameInner>`) from an arena, a slice
501 /// of `(offset, length)` plane descriptors, and a header. Returns
502 /// [`Error::InvalidData`] if more than [`MAX_PLANES`] planes are
503 /// supplied or if any plane range falls outside the arena's used
504 /// region.
505 pub fn new(arena: Arena, planes: &[(usize, usize)], header: FrameHeader) -> Result<Frame> {
506 if planes.len() > MAX_PLANES {
507 return Err(Error::invalid(format!(
508 "FrameInner supports at most {} planes (got {})",
509 MAX_PLANES,
510 planes.len()
511 )));
512 }
513 let used = arena.used();
514 for (i, (off, len)) in planes.iter().enumerate() {
515 let end = off
516 .checked_add(*len)
517 .ok_or_else(|| Error::invalid(format!("plane {i}: offset+len overflow")))?;
518 if end > used {
519 return Err(Error::invalid(format!(
520 "plane {i}: range {off}..{end} exceeds arena used={used}"
521 )));
522 }
523 }
524 let mut plane_offsets = [(0usize, 0usize); MAX_PLANES];
525 for (i, p) in planes.iter().enumerate() {
526 plane_offsets[i] = *p;
527 }
528 Ok(Arc::new(FrameInner {
529 arena,
530 plane_offsets,
531 plane_count: planes.len() as u8,
532 header,
533 }))
534 }
535
536 /// Number of planes this frame holds.
537 pub fn plane_count(&self) -> usize {
538 self.plane_count as usize
539 }
540
541 /// Read-only access to plane `i`. Returns `None` if `i` is out of
542 /// range.
543 pub fn plane(&self, i: usize) -> Option<&[u8]> {
544 if i >= self.plane_count as usize {
545 return None;
546 }
547 let (off, len) = self.plane_offsets[i];
548 // SAFETY:
549 // - plane ranges were validated against `arena.used()` at
550 // construction (`off + len <= arena.cursor`), and the
551 // cursor only advances, so the byte range is in-bounds.
552 // - The bytes were written by `alloc` and never moved (the
553 // buffer's allocation is stable for the arena's lifetime).
554 // - We derive the slice from the raw base pointer, never via
555 // a re-borrow of the whole buffer, so this `&[u8]` does not
556 // invalidate any other slice the caller is holding.
557 // - The borrow lifetime is bounded by `&self`.
558 let buf: &[u8] = unsafe {
559 let elem_ptr = self.arena.base.as_ptr().add(off);
560 std::slice::from_raw_parts(elem_ptr, len)
561 };
562 Some(buf)
563 }
564
565 /// Frame header (width / height / pixel format / pts).
566 pub fn header(&self) -> &FrameHeader {
567 &self.header
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::format::PixelFormat;
575
576 fn assert_send_sync<T: Send + Sync>() {}
577
578 #[test]
579 fn types_are_send_sync() {
580 // The whole point of this module: prove the public types
581 // satisfy the cross-thread contract that `crate::arena` does
582 // not.
583 assert_send_sync::<ArenaPool>();
584 assert_send_sync::<Arc<ArenaPool>>();
585 assert_send_sync::<Arena>();
586 assert_send_sync::<FrameInner>();
587 assert_send_sync::<Frame>();
588 }
589
590 fn small_pool(slots: usize, cap: usize) -> Arc<ArenaPool> {
591 ArenaPool::new(slots, cap)
592 }
593
594 #[test]
595 fn pool_lease_returns_err_when_exhausted() {
596 let pool = small_pool(2, 1024);
597 let a = pool.lease().expect("first lease");
598 let b = pool.lease().expect("second lease");
599 let third = pool.lease();
600 assert!(matches!(third, Err(Error::ResourceExhausted(_))));
601 drop((a, b));
602 }
603
604 #[test]
605 fn arena_alloc_caps_at_size_limit() {
606 let pool = small_pool(1, 64);
607 let arena = pool.lease().unwrap();
608 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
609 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
610 let third = arena.alloc::<u8>(1);
611 assert!(matches!(third, Err(Error::ResourceExhausted(_))));
612 }
613
614 #[test]
615 fn arena_alloc_count_cap_fires() {
616 let pool = ArenaPool::with_alloc_count_cap(1, 1024, 3);
617 let arena = pool.lease().unwrap();
618 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
619 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
620 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
621 assert!(arena.alloc_count_exceeded());
622 let fourth = arena.alloc::<u8>(1);
623 assert!(matches!(fourth, Err(Error::ResourceExhausted(_))));
624 // Counter must remain at the cap even after a refused alloc
625 // — no drift from the rollback path.
626 assert_eq!(arena.alloc_count(), 3);
627 }
628
629 #[test]
630 fn arena_returns_to_pool_on_drop() {
631 let pool = small_pool(1, 256);
632 {
633 let arena = pool.lease().expect("first lease");
634 assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
635 drop(arena);
636 }
637 let _again = pool.lease().expect("re-lease after drop");
638 }
639
640 #[test]
641 fn arena_alignment_is_respected() {
642 let pool = small_pool(1, 64);
643 let arena = pool.lease().unwrap();
644 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
645 let s: &mut [u32] = arena.alloc::<u32>(4).unwrap();
646 let addr = s.as_ptr() as usize;
647 assert_eq!(addr % align_of::<u32>(), 0);
648 assert_eq!(s.len(), 4);
649 }
650
651 #[cfg(miri)]
652 #[test]
653 fn arena_alloc_can_return_misaligned_typed_slice() {
654 let pool = small_pool(1, 0);
655 let arena = pool.lease().unwrap();
656
657 // Memory-safety issue: the arena's backing allocation is a
658 // `Box<[u8]>`, so its base pointer is only guaranteed to be
659 // byte-aligned. `alloc::<T>` aligns only the byte offset, not
660 // the absolute address, and then constructs `&mut [T]`. The
661 // empty-buffer case makes this deterministic: even an empty
662 // `&mut [u32]` must have an aligned pointer, but `Box<[u8]>`
663 // uses an alignment-1 dangling pointer when its length is 0.
664 let _s: &mut [u32] = arena.alloc::<u32>(0).unwrap();
665 }
666
667 // Pre-fix this test was:
668 //
669 // let values = arena.alloc::<std::num::NonZeroU8>(1).unwrap();
670 // let _ = values[0].get();
671 //
672 // and failed under Miri because pool buffers are zero-filled and
673 // zero is not a valid `NonZeroU8`. Post-fix, the `Zeroable` bound
674 // on `Arena::alloc` makes that call a hard *compile* error — the
675 // strongest possible enforcement. The test below is a regression
676 // assertion that the bound stays as-or-stricter than `Zeroable`:
677 // if a future refactor weakened it back to `Copy`, the
678 // commented-out call site would start compiling again and Miri
679 // would once again accept the invalid bit pattern.
680 #[cfg(miri)]
681 #[test]
682 fn arena_alloc_allows_invalid_bit_patterns_for_copy_types() {
683 // `requires_zeroable::<NonZeroU8>()` would not compile —
684 // `NonZeroU8: !Zeroable`. Sanity-check the helper itself with
685 // a known zeroable type so the test is an actual exercise.
686 fn requires_zeroable<T: bytemuck::Zeroable>() {}
687 requires_zeroable::<u8>();
688 // Uncommenting the next line must fail to compile:
689 // requires_zeroable::<std::num::NonZeroU8>();
690 }
691
692 #[cfg(miri)]
693 #[test]
694 fn arena_alloc_second_slice_invalidates_first_mut_reference() {
695 let pool = small_pool(1, 2);
696 let arena = pool.lease().unwrap();
697
698 // Memory-safety issue: each `alloc` calls `[u8]::as_mut_ptr` on
699 // the whole backing slice before carving out the requested
700 // subslice. That materializes a new mutable borrow of the whole
701 // buffer and invalidates previously returned `&mut` slices, even
702 // when the byte ranges are disjoint.
703 let first = arena.alloc::<u8>(1).unwrap();
704 let second = arena.alloc::<u8>(1).unwrap();
705 first[0] = 1;
706 second[0] = 2;
707 }
708
709 fn build_simple_frame(pool: &Arc<ArenaPool>) -> Frame {
710 let arena = pool.lease().unwrap();
711 let plane0: &mut [u8] = arena.alloc::<u8>(16).unwrap();
712 for (i, b) in plane0.iter_mut().enumerate() {
713 *b = i as u8;
714 }
715 let header = FrameHeader::new(4, 4, PixelFormat::Gray8, Some(42));
716 FrameInner::new(arena, &[(0, 16)], header).unwrap()
717 }
718
719 #[test]
720 fn frame_refcount_keeps_arena_alive() {
721 let pool = small_pool(1, 256);
722 let frame = build_simple_frame(&pool);
723 let clone = Arc::clone(&frame);
724 drop(frame);
725 let plane = clone.plane(0).expect("plane 0");
726 assert_eq!(plane.len(), 16);
727 for (i, b) in plane.iter().enumerate() {
728 assert_eq!(*b, i as u8);
729 }
730 assert_eq!(clone.header().width, 4);
731 assert_eq!(clone.header().height, 4);
732 assert_eq!(clone.header().presentation_timestamp, Some(42));
733 assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
734 }
735
736 #[test]
737 fn last_drop_returns_arena_to_pool() {
738 let pool = small_pool(1, 256);
739 let frame = build_simple_frame(&pool);
740 let clone = Arc::clone(&frame);
741 drop(frame);
742 drop(clone);
743 let _again = pool.lease().expect("lease after last drop");
744 }
745
746 #[test]
747 fn frame_rejects_too_many_planes() {
748 let pool = small_pool(1, 256);
749 let arena = pool.lease().unwrap();
750 let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
751 let too_many = vec![(0usize, 0usize); MAX_PLANES + 1];
752 let r = FrameInner::new(arena, &too_many, header);
753 assert!(matches!(r, Err(Error::InvalidData(_))));
754 }
755
756 #[test]
757 fn frame_rejects_plane_outside_arena() {
758 let pool = small_pool(1, 64);
759 let arena = pool.lease().unwrap();
760 let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
761 let r = FrameInner::new(arena, &[(0, 16)], header);
762 assert!(matches!(r, Err(Error::InvalidData(_))));
763 }
764
765 #[test]
766 fn pool_outlives_buffer_drop_when_pool_dropped_first() {
767 let pool = small_pool(1, 64);
768 let arena = pool.lease().unwrap();
769 drop(pool);
770 drop(arena);
771 }
772
773 #[test]
774 fn arena_reset_clears_allocations() {
775 let pool = small_pool(1, 32);
776 let mut arena = pool.lease().unwrap();
777 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
778 assert!(matches!(
779 arena.alloc::<u8>(1),
780 Err(Error::ResourceExhausted(_))
781 ));
782 arena.reset();
783 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
784 }
785
786 #[test]
787 fn frame_can_be_sent_across_thread_boundary() {
788 // Build a frame on this thread, ship it to a worker thread,
789 // read its bytes there. This is the use case the module
790 // exists to enable; if it ever stops compiling, the
791 // `Send + Sync` impls above are wrong.
792 let pool = small_pool(1, 256);
793 let frame = build_simple_frame(&pool);
794 let frame_for_worker = Arc::clone(&frame);
795 let handle = std::thread::spawn(move || {
796 let plane = frame_for_worker.plane(0).expect("plane 0 on worker");
797 let mut sum: u32 = 0;
798 for b in plane {
799 sum += *b as u32;
800 }
801 sum
802 });
803 let sum = handle.join().expect("worker joined");
804 // Plane was filled with 0..16, sum = 120.
805 assert_eq!(sum, (0..16u32).sum::<u32>());
806 // Original frame still readable here too.
807 assert_eq!(frame.plane(0).unwrap().len(), 16);
808 }
809
810 #[test]
811 fn concurrent_alloc_produces_disjoint_slices() {
812 // Two threads alloc 64 bytes each from a 256-byte arena.
813 // Their slices must not overlap.
814 let pool = small_pool(1, 256);
815 let arena = Arc::new(pool.lease().unwrap());
816 let a = Arc::clone(&arena);
817 let b = Arc::clone(&arena);
818 let h1 = std::thread::spawn(move || {
819 let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
820 // Fill so we can detect overlap from the other thread.
821 for x in s.iter_mut() {
822 *x = 0xAA;
823 }
824 (s.as_ptr() as usize, s.len())
825 });
826 let h2 = std::thread::spawn(move || {
827 let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
828 for x in s.iter_mut() {
829 *x = 0xBB;
830 }
831 (s.as_ptr() as usize, s.len())
832 });
833 let (p1, l1) = h1.join().unwrap();
834 let (p2, l2) = h2.join().unwrap();
835 // Disjoint ranges: [p1, p1+l1) and [p2, p2+l2) do not overlap.
836 let no_overlap = p1 + l1 <= p2 || p2 + l2 <= p1;
837 assert!(no_overlap, "concurrent alloc returned overlapping slices");
838 }
839
840 #[cfg(miri)]
841 #[test]
842 fn concurrent_alloc_retags_whole_buffer_while_other_thread_writes() {
843 // Memory-safety issue: `Arena` is `Sync`, so safe code can call
844 // `alloc` while another thread writes through a previously
845 // returned slice. The CAS cursor makes the byte ranges disjoint,
846 // but `alloc` still materializes a mutable borrow of the whole
847 // buffer via `[u8]::as_mut_ptr`; Miri reports that retag as a
848 // data race with the other thread's write.
849 let pool = small_pool(1, 256);
850 let arena = Arc::new(pool.lease().unwrap());
851 let barrier = Arc::new(std::sync::Barrier::new(2));
852 let a = Arc::clone(&arena);
853 let b = Arc::clone(&arena);
854 let barrier_a = Arc::clone(&barrier);
855 let barrier_b = Arc::clone(&barrier);
856 let h1 = std::thread::spawn(move || {
857 let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
858 barrier_a.wait();
859 for x in s.iter_mut() {
860 *x = 0xAA;
861 }
862 });
863 let h2 = std::thread::spawn(move || {
864 barrier_b.wait();
865 let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
866 for x in s.iter_mut() {
867 *x = 0xBB;
868 }
869 });
870 h1.join().unwrap();
871 h2.join().unwrap();
872 }
873}