oxideav_core/arena/sync.rs
1//! `Send + Sync` mirror of the parent [`crate::arena`] module.
2//!
3//! This module exposes the same four-type API ([`ArenaPool`],
4//! [`Arena`], [`Frame`], [`FrameInner`]) as its sibling, with one
5//! difference that ripples through the whole shape:
6//!
7//! - [`Arena`] uses `AtomicUsize` / `AtomicU32` for its bump cursor
8//! and allocation counter (instead of `Cell<usize>` / `Cell<u32>`),
9//! and is therefore `Send + Sync`.
10//! - [`Frame`] is `Arc<FrameInner>` (instead of `Rc<FrameInner>`),
11//! so a decoded frame can be moved or shared across threads.
12//! - [`FrameInner`] holds a sync [`Arena`], so it is itself `Send +
13//! Sync` and `Arc<FrameInner>: Send + Sync` falls out for free.
14//!
15//! ## When to use which
16//!
17//! Use [`crate::arena`] (the `Rc` variant) when the decoder produces
18//! frames on the same thread that consumes them. The bump cursor is
19//! a plain `Cell<usize>` and there are no atomic operations on the
20//! hot allocation path.
21//!
22//! Use this module (the `Arc` variant) when the decoder hands frames
23//! to a different thread — the typical case for a pipeline that
24//! decodes on one worker and renders / encodes / transmits on
25//! another. The cost is a relaxed atomic load + CAS per allocation
26//! and an atomic refcount per frame clone; both are negligible
27//! compared to the actual decode work.
28//!
29//! ## Concurrent allocation contract
30//!
31//! [`Arena::alloc`] uses a CAS loop on the cursor, so two threads
32//! that both call [`Arena::alloc`] on the same `&Arena` will receive
33//! disjoint slices (the loser of the CAS retries against the new
34//! cursor). The returned `&mut [T]` points into a region that no
35//! other in-flight `alloc()` call can also receive, and the slice's
36//! lifetime is bounded by the borrow of `&self`.
37//!
38//! In practice the typical pattern is **one decoder thread allocates,
39//! then freezes into a [`Frame`] which is shared read-only across
40//! threads** — concurrent allocation is supported but rarely useful.
41//! The bytes returned by [`Arena::alloc`] are not zero-initialised;
42//! callers must fully overwrite them before reading.
43//!
44//! Everything else (per-arena byte cap, per-arena allocation-count
45//! cap, weak handle back to the pool for `Drop`-time release, the
46//! `FrameHeader` shape, plane validation in [`FrameInner::new`])
47//! matches the parent module exactly.
48
49use std::cell::UnsafeCell;
50use std::mem::{align_of, size_of};
51use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
52use std::sync::{Arc, Mutex, Weak};
53
54use crate::error::{Error, Result};
55
56// Re-export the shared `FrameHeader` and `MAX_PLANES` constant so
57// users of either arena module see the same metadata shape — there is
58// no thread-safety angle to either of them, and duplicating them
59// would only add drift.
60pub use super::{FrameHeader, MAX_PLANES};
61
62/// `Send + Sync` pool of reusable byte buffers for arena-backed frame
63/// allocations. Mirrors [`crate::arena::ArenaPool`] in shape and
64/// behaviour; the only difference is that the [`Arena`] (and the
65/// [`Frame`] holding it) handed out are themselves `Send + Sync`.
66///
67/// Construct via [`ArenaPool::new`]. Lease an [`Arena`] per frame via
68/// [`ArenaPool::lease`]; drop the arena (or drop the last clone of a
69/// [`Frame`] holding it) to return its buffer to the pool.
70pub struct ArenaPool {
71 inner: Mutex<PoolInner>,
72 cap_per_arena: usize,
73 max_arenas: usize,
74 max_alloc_count_per_arena: u32,
75}
76
77struct PoolInner {
78 /// Buffers currently sitting idle in the pool (ready to lease).
79 idle: Vec<Box<[u8]>>,
80 /// Total buffers ever allocated by this pool (idle + in-flight).
81 /// Caps lazy growth at `max_arenas`.
82 total_allocated: usize,
83}
84
85impl ArenaPool {
86 /// Construct a new pool with `max_arenas` buffer slots, each of
87 /// `cap_per_arena` bytes. Buffers are allocated lazily on first
88 /// lease — a freshly constructed pool holds no memory.
89 ///
90 /// Per-arena allocation count is capped at a generous 1 M
91 /// (override via [`ArenaPool::with_alloc_count_cap`]).
92 pub fn new(max_arenas: usize, cap_per_arena: usize) -> Arc<Self> {
93 Self::with_alloc_count_cap(max_arenas, cap_per_arena, 1_000_000)
94 }
95
96 /// Like [`ArenaPool::new`] but lets the caller set the per-arena
97 /// allocation-count cap. Useful when the caller is plumbing
98 /// [`crate::DecoderLimits`] through.
99 pub fn with_alloc_count_cap(
100 max_arenas: usize,
101 cap_per_arena: usize,
102 max_alloc_count_per_arena: u32,
103 ) -> Arc<Self> {
104 Arc::new(Self {
105 inner: Mutex::new(PoolInner {
106 idle: Vec::with_capacity(max_arenas),
107 total_allocated: 0,
108 }),
109 cap_per_arena,
110 max_arenas,
111 max_alloc_count_per_arena,
112 })
113 }
114
115 /// Capacity of each arena buffer this pool hands out, in bytes.
116 pub fn cap_per_arena(&self) -> usize {
117 self.cap_per_arena
118 }
119
120 /// Maximum number of arenas that may be checked out at once.
121 pub fn max_arenas(&self) -> usize {
122 self.max_arenas
123 }
124
125 /// Lease one arena from the pool. Returns
126 /// [`Error::ResourceExhausted`] if every arena slot is already
127 /// checked out by an [`Arena`] (or a [`Frame`] holding one).
128 pub fn lease(self: &Arc<Self>) -> Result<Arena> {
129 let buffer = {
130 let mut inner = self.inner.lock().expect("ArenaPool mutex poisoned");
131 if let Some(buf) = inner.idle.pop() {
132 buf
133 } else if inner.total_allocated < self.max_arenas {
134 inner.total_allocated += 1;
135 vec![0u8; self.cap_per_arena].into_boxed_slice()
136 } else {
137 return Err(Error::resource_exhausted(format!(
138 "ArenaPool exhausted: all {} arenas checked out",
139 self.max_arenas
140 )));
141 }
142 };
143
144 Ok(Arena {
145 buffer: UnsafeCell::new(buffer),
146 cursor: AtomicUsize::new(0),
147 alloc_count: AtomicU32::new(0),
148 cap: self.cap_per_arena,
149 alloc_count_cap: self.max_alloc_count_per_arena,
150 pool: Arc::downgrade(self),
151 })
152 }
153
154 /// Return a buffer to the idle list. Called from `Arena::Drop`;
155 /// not part of the public API.
156 fn release(&self, buffer: Box<[u8]>) {
157 if let Ok(mut inner) = self.inner.lock() {
158 inner.idle.push(buffer);
159 }
160 // If the lock is poisoned, drop the buffer normally — the
161 // pool is in an unusable state already.
162 }
163}
164
165/// One leased buffer from a [`ArenaPool`]. `Send + Sync`.
166///
167/// Allocations are bump-pointer on an atomic cursor: each call to
168/// [`Arena::alloc`] CAS-advances the cursor and returns a fresh
169/// aligned slice carved out of the buffer at the old position. There
170/// is no per-allocation header and no individual free — the entire
171/// arena is reset (returned to the pool) only when the `Arena` is
172/// dropped.
173///
174/// Concurrent calls to [`Arena::alloc`] on the same `&Arena` are
175/// supported and produce disjoint slices (the CAS loser retries
176/// against the new cursor). See the module docs for the full
177/// concurrency contract.
178pub struct Arena {
179 /// Backing buffer leased from the pool.
180 ///
181 /// Wrapped in `UnsafeCell` because `&Arena::alloc` returns
182 /// `&mut [T]` slices that borrow into non-overlapping regions of
183 /// the same buffer. The atomic cursor below guarantees the
184 /// regions returned by successive (or concurrent) `alloc` calls
185 /// never overlap.
186 buffer: UnsafeCell<Box<[u8]>>,
187 /// Atomic bump cursor: the next free byte offset within `buffer`.
188 cursor: AtomicUsize,
189 /// Atomic allocation counter.
190 alloc_count: AtomicU32,
191 /// Cached cap (== `pool.cap_per_arena` at lease time).
192 cap: usize,
193 /// Cached cap (== `pool.max_alloc_count_per_arena` at lease time).
194 alloc_count_cap: u32,
195 /// Weak handle back to the pool so `Drop` can return the buffer.
196 pool: Weak<ArenaPool>,
197}
198
199// SAFETY: `Arena` owns its `Box<[u8]>` (no shared ownership of the
200// buffer with anything else) and all mutation goes through the atomic
201// cursor + the (correctly synchronised) `UnsafeCell`. The public
202// `alloc` API uses a CAS loop so concurrent allocators get disjoint
203// regions; the `Drop` path moves the buffer out under exclusive
204// access (`&mut self`). Sending the arena across threads is therefore
205// sound.
206unsafe impl Send for Arena {}
207// SAFETY: `&Arena::alloc` mutates only via the atomic cursor and the
208// allocation counter (themselves `Sync`) and writes into a region of
209// the `UnsafeCell` buffer that no other in-flight call has been
210// handed (CAS guarantees disjoint regions). Two threads holding
211// `&Arena` therefore cannot produce overlapping `&mut [T]` slices.
212unsafe impl Sync for Arena {}
213
214impl Arena {
215 /// Capacity of this arena in bytes.
216 pub fn capacity(&self) -> usize {
217 self.cap
218 }
219
220 /// Bytes consumed by allocations so far.
221 pub fn used(&self) -> usize {
222 self.cursor.load(Ordering::Acquire)
223 }
224
225 /// Number of allocations performed so far.
226 pub fn alloc_count(&self) -> u32 {
227 self.alloc_count.load(Ordering::Acquire)
228 }
229
230 /// `true` once the per-arena allocation-count cap has been
231 /// reached. Decoders that produce many small allocations should
232 /// poll this and bail with [`Error::ResourceExhausted`] when it
233 /// flips, instead of waiting for the next [`Arena::alloc`] call
234 /// to fail.
235 pub fn alloc_count_exceeded(&self) -> bool {
236 self.alloc_count.load(Ordering::Acquire) >= self.alloc_count_cap
237 }
238
239 /// Allocate `count` `T`s out of this arena. Returns a borrowed
240 /// `&mut [T]` (lifetime bounded by the borrow of `self`). The
241 /// bytes are not zero-initialised — the caller is responsible
242 /// for fully writing the returned slice before reading it.
243 ///
244 /// Returns [`Error::ResourceExhausted`] if either the per-arena
245 /// byte cap or the per-arena allocation-count cap would be
246 /// exceeded.
247 ///
248 /// # Safety / contract
249 ///
250 /// `T` must be a "plain old data" type with no `Drop` glue and
251 /// no invariants that need a constructor — typically `u8`, `i16`,
252 /// `u32`, `f32`, etc. The arena does not run destructors on
253 /// allocated values. This is enforced via a `T: Copy` bound.
254 ///
255 /// **Concurrency:** the bump cursor is advanced via a CAS loop,
256 /// so concurrent `alloc` calls on the same `&Arena` produce
257 /// disjoint slices. The CAS loser retries against the new
258 /// cursor; in the uncontended case the cost is a single relaxed
259 /// load plus one successful CAS.
260 #[allow(clippy::mut_from_ref)] // see "Concurrency" doc above.
261 pub fn alloc<T>(&self, count: usize) -> Result<&mut [T]>
262 where
263 T: Copy,
264 {
265 // Allocation-count cap. Increment first; if we overshoot,
266 // roll back so subsequent calls still see the correct value.
267 let prev_count = self.alloc_count.fetch_add(1, Ordering::AcqRel);
268 if prev_count >= self.alloc_count_cap {
269 // Roll back so `alloc_count_exceeded()` keeps returning
270 // a stable cap value rather than drifting upward.
271 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
272 return Err(Error::resource_exhausted(format!(
273 "Arena alloc-count cap of {} exceeded",
274 self.alloc_count_cap
275 )));
276 }
277
278 let elem_size = size_of::<T>();
279 let elem_align = align_of::<T>();
280 // Bytes requested.
281 let bytes = elem_size.checked_mul(count).ok_or_else(|| {
282 // Roll back the alloc-count bump on size-overflow too.
283 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
284 Error::resource_exhausted("Arena alloc size overflow".to_string())
285 })?;
286
287 // CAS loop on the cursor. We compute aligned + new_cursor
288 // from the latest observed cursor value, then attempt to
289 // claim that range; if another thread won the race, retry
290 // against the updated cursor.
291 let mut current = self.cursor.load(Ordering::Acquire);
292 let aligned;
293 let new_cursor;
294 loop {
295 let candidate_aligned = match align_up(current, elem_align) {
296 Some(a) => a,
297 None => {
298 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
299 return Err(Error::resource_exhausted(
300 "Arena cursor alignment overflow".to_string(),
301 ));
302 }
303 };
304 let candidate_new = match candidate_aligned.checked_add(bytes) {
305 Some(n) => n,
306 None => {
307 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
308 return Err(Error::resource_exhausted(
309 "Arena cursor advance overflow".to_string(),
310 ));
311 }
312 };
313
314 if candidate_new > self.cap {
315 self.alloc_count.fetch_sub(1, Ordering::AcqRel);
316 return Err(Error::resource_exhausted(format!(
317 "Arena cap of {} bytes exceeded (would consume {} bytes)",
318 self.cap, candidate_new
319 )));
320 }
321
322 match self.cursor.compare_exchange_weak(
323 current,
324 candidate_new,
325 Ordering::AcqRel,
326 Ordering::Acquire,
327 ) {
328 Ok(_) => {
329 aligned = candidate_aligned;
330 new_cursor = candidate_new;
331 let _ = new_cursor; // silence "unused" if optimised
332 break;
333 }
334 Err(observed) => {
335 current = observed;
336 // Retry with the freshly observed cursor.
337 }
338 }
339 }
340
341 // SAFETY: we have just CAS-claimed the byte range
342 // `aligned..aligned+bytes`. No other in-flight `alloc` call
343 // can claim any byte inside that range (the cursor is
344 // monotonically non-decreasing under successful CAS, so a
345 // subsequent winner observes a `current` >= our `new_cursor`
346 // and therefore proposes a `candidate_aligned` >=
347 // `new_cursor`). The buffer outlives the borrow of `&self`.
348 // T: Copy guarantees we don't need to drop previous contents.
349 let slice: &mut [T] = unsafe {
350 let buf_ptr = (*self.buffer.get()).as_mut_ptr();
351 let elem_ptr = buf_ptr.add(aligned).cast::<T>();
352 std::slice::from_raw_parts_mut(elem_ptr, count)
353 };
354
355 Ok(slice)
356 }
357
358 /// Reset the arena to empty without releasing its buffer to the
359 /// pool. Useful for a decoder that wants to reuse the same arena
360 /// across several intermediate stages of the same frame. Callers
361 /// must ensure no slice previously returned from [`Arena::alloc`]
362 /// is still in use — Rust's borrow checker enforces this, since
363 /// `reset` takes `&mut self`.
364 pub fn reset(&mut self) {
365 // `&mut self` proves exclusive access; non-atomic stores
366 // would suffice, but the atomic API is uniform.
367 self.cursor.store(0, Ordering::Release);
368 self.alloc_count.store(0, Ordering::Release);
369 }
370}
371
372impl Drop for Arena {
373 fn drop(&mut self) {
374 // Take the buffer out of the UnsafeCell. We're in Drop with
375 // `&mut self`, so no other references to it can exist.
376 let buffer = std::mem::replace(
377 unsafe { &mut *self.buffer.get() },
378 Vec::new().into_boxed_slice(),
379 );
380 if let Some(pool) = self.pool.upgrade() {
381 pool.release(buffer);
382 }
383 // else: pool was dropped before us — buffer drops here.
384 }
385}
386
387/// Round `n` up to the next multiple of `align`. `align` must be a
388/// power of two. Returns `None` on overflow.
389fn align_up(n: usize, align: usize) -> Option<usize> {
390 debug_assert!(align.is_power_of_two(), "alignment must be a power of two");
391 let mask = align - 1;
392 n.checked_add(mask).map(|m| m & !mask)
393}
394
395/// The owned body of a refcounted [`Frame`]. `Send + Sync`.
396///
397/// Holds a [`sync::Arena`](Arena) (the bytes), a fixed-size table of
398/// `(offset_in_arena, length_in_bytes)` pairs (one per plane), and a
399/// [`FrameHeader`]. The `plane_count` field tracks how many entries
400/// of `plane_offsets` are actually populated. Up to [`MAX_PLANES`]
401/// planes are supported.
402///
403/// **Lifetime:** an [`Arena`] returns its buffer to the pool when
404/// dropped. An `Arc<FrameInner>` keeps the arena alive via its single
405/// owned field, so as long as any clone of a [`Frame`] exists the
406/// underlying buffer stays out of the pool.
407pub struct FrameInner {
408 arena: Arena,
409 plane_offsets: [(usize, usize); MAX_PLANES],
410 plane_count: u8,
411 header: FrameHeader,
412}
413
414/// Refcounted handle to a decoded video frame. `Send + Sync`.
415///
416/// Construct via [`FrameInner::new`]; clone freely (each clone bumps
417/// the atomic refcount by 1). The arena and its buffer are released
418/// back to the pool when the last clone is dropped.
419///
420/// Use this type when the decoder hands frames to a different thread
421/// from the one that produced them. For same-thread decode/consume,
422/// the cheaper [`crate::arena::Frame`] (`Rc`-backed) is preferable.
423pub type Frame = Arc<FrameInner>;
424
425impl FrameInner {
426 /// Construct a `Frame` (`Arc<FrameInner>`) from an arena, a slice
427 /// of `(offset, length)` plane descriptors, and a header. Returns
428 /// [`Error::InvalidData`] if more than [`MAX_PLANES`] planes are
429 /// supplied or if any plane range falls outside the arena's used
430 /// region.
431 pub fn new(arena: Arena, planes: &[(usize, usize)], header: FrameHeader) -> Result<Frame> {
432 if planes.len() > MAX_PLANES {
433 return Err(Error::invalid(format!(
434 "FrameInner supports at most {} planes (got {})",
435 MAX_PLANES,
436 planes.len()
437 )));
438 }
439 let used = arena.used();
440 for (i, (off, len)) in planes.iter().enumerate() {
441 let end = off
442 .checked_add(*len)
443 .ok_or_else(|| Error::invalid(format!("plane {i}: offset+len overflow")))?;
444 if end > used {
445 return Err(Error::invalid(format!(
446 "plane {i}: range {off}..{end} exceeds arena used={used}"
447 )));
448 }
449 }
450 let mut plane_offsets = [(0usize, 0usize); MAX_PLANES];
451 for (i, p) in planes.iter().enumerate() {
452 plane_offsets[i] = *p;
453 }
454 Ok(Arc::new(FrameInner {
455 arena,
456 plane_offsets,
457 plane_count: planes.len() as u8,
458 header,
459 }))
460 }
461
462 /// Number of planes this frame holds.
463 pub fn plane_count(&self) -> usize {
464 self.plane_count as usize
465 }
466
467 /// Read-only access to plane `i`. Returns `None` if `i` is out of
468 /// range.
469 pub fn plane(&self, i: usize) -> Option<&[u8]> {
470 if i >= self.plane_count as usize {
471 return None;
472 }
473 let (off, len) = self.plane_offsets[i];
474 // SAFETY: plane ranges were validated against `arena.used()`
475 // at construction; no allocation has touched bytes inside any
476 // already-handed-out region (cursor only advances). We borrow
477 // with the lifetime of `&self`.
478 let buf: &[u8] = unsafe {
479 let buf_ref = &*self.arena.buffer.get();
480 &(**buf_ref)[off..off + len]
481 };
482 Some(buf)
483 }
484
485 /// Frame header (width / height / pixel format / pts).
486 pub fn header(&self) -> &FrameHeader {
487 &self.header
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use crate::format::PixelFormat;
495
496 fn assert_send_sync<T: Send + Sync>() {}
497
498 #[test]
499 fn types_are_send_sync() {
500 // The whole point of this module: prove the public types
501 // satisfy the cross-thread contract that `crate::arena` does
502 // not.
503 assert_send_sync::<ArenaPool>();
504 assert_send_sync::<Arc<ArenaPool>>();
505 assert_send_sync::<Arena>();
506 assert_send_sync::<FrameInner>();
507 assert_send_sync::<Frame>();
508 }
509
510 fn small_pool(slots: usize, cap: usize) -> Arc<ArenaPool> {
511 ArenaPool::new(slots, cap)
512 }
513
514 #[test]
515 fn pool_lease_returns_err_when_exhausted() {
516 let pool = small_pool(2, 1024);
517 let a = pool.lease().expect("first lease");
518 let b = pool.lease().expect("second lease");
519 let third = pool.lease();
520 assert!(matches!(third, Err(Error::ResourceExhausted(_))));
521 drop((a, b));
522 }
523
524 #[test]
525 fn arena_alloc_caps_at_size_limit() {
526 let pool = small_pool(1, 64);
527 let arena = pool.lease().unwrap();
528 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
529 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
530 let third = arena.alloc::<u8>(1);
531 assert!(matches!(third, Err(Error::ResourceExhausted(_))));
532 }
533
534 #[test]
535 fn arena_alloc_count_cap_fires() {
536 let pool = ArenaPool::with_alloc_count_cap(1, 1024, 3);
537 let arena = pool.lease().unwrap();
538 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
539 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
540 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
541 assert!(arena.alloc_count_exceeded());
542 let fourth = arena.alloc::<u8>(1);
543 assert!(matches!(fourth, Err(Error::ResourceExhausted(_))));
544 // Counter must remain at the cap even after a refused alloc
545 // — no drift from the rollback path.
546 assert_eq!(arena.alloc_count(), 3);
547 }
548
549 #[test]
550 fn arena_returns_to_pool_on_drop() {
551 let pool = small_pool(1, 256);
552 {
553 let arena = pool.lease().expect("first lease");
554 assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
555 drop(arena);
556 }
557 let _again = pool.lease().expect("re-lease after drop");
558 }
559
560 #[test]
561 fn arena_alignment_is_respected() {
562 let pool = small_pool(1, 64);
563 let arena = pool.lease().unwrap();
564 let _: &mut [u8] = arena.alloc::<u8>(1).unwrap();
565 let s: &mut [u32] = arena.alloc::<u32>(4).unwrap();
566 let addr = s.as_ptr() as usize;
567 assert_eq!(addr % align_of::<u32>(), 0);
568 assert_eq!(s.len(), 4);
569 }
570
571 fn build_simple_frame(pool: &Arc<ArenaPool>) -> Frame {
572 let arena = pool.lease().unwrap();
573 let plane0: &mut [u8] = arena.alloc::<u8>(16).unwrap();
574 for (i, b) in plane0.iter_mut().enumerate() {
575 *b = i as u8;
576 }
577 let header = FrameHeader::new(4, 4, PixelFormat::Gray8, Some(42));
578 FrameInner::new(arena, &[(0, 16)], header).unwrap()
579 }
580
581 #[test]
582 fn frame_refcount_keeps_arena_alive() {
583 let pool = small_pool(1, 256);
584 let frame = build_simple_frame(&pool);
585 let clone = Arc::clone(&frame);
586 drop(frame);
587 let plane = clone.plane(0).expect("plane 0");
588 assert_eq!(plane.len(), 16);
589 for (i, b) in plane.iter().enumerate() {
590 assert_eq!(*b, i as u8);
591 }
592 assert_eq!(clone.header().width, 4);
593 assert_eq!(clone.header().height, 4);
594 assert_eq!(clone.header().presentation_timestamp, Some(42));
595 assert!(matches!(pool.lease(), Err(Error::ResourceExhausted(_))));
596 }
597
598 #[test]
599 fn last_drop_returns_arena_to_pool() {
600 let pool = small_pool(1, 256);
601 let frame = build_simple_frame(&pool);
602 let clone = Arc::clone(&frame);
603 drop(frame);
604 drop(clone);
605 let _again = pool.lease().expect("lease after last drop");
606 }
607
608 #[test]
609 fn frame_rejects_too_many_planes() {
610 let pool = small_pool(1, 256);
611 let arena = pool.lease().unwrap();
612 let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
613 let too_many = vec![(0usize, 0usize); MAX_PLANES + 1];
614 let r = FrameInner::new(arena, &too_many, header);
615 assert!(matches!(r, Err(Error::InvalidData(_))));
616 }
617
618 #[test]
619 fn frame_rejects_plane_outside_arena() {
620 let pool = small_pool(1, 64);
621 let arena = pool.lease().unwrap();
622 let header = FrameHeader::new(1, 1, PixelFormat::Gray8, None);
623 let r = FrameInner::new(arena, &[(0, 16)], header);
624 assert!(matches!(r, Err(Error::InvalidData(_))));
625 }
626
627 #[test]
628 fn pool_outlives_buffer_drop_when_pool_dropped_first() {
629 let pool = small_pool(1, 64);
630 let arena = pool.lease().unwrap();
631 drop(pool);
632 drop(arena);
633 }
634
635 #[test]
636 fn arena_reset_clears_allocations() {
637 let pool = small_pool(1, 32);
638 let mut arena = pool.lease().unwrap();
639 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
640 assert!(matches!(
641 arena.alloc::<u8>(1),
642 Err(Error::ResourceExhausted(_))
643 ));
644 arena.reset();
645 let _: &mut [u8] = arena.alloc::<u8>(32).unwrap();
646 }
647
648 #[test]
649 fn frame_can_be_sent_across_thread_boundary() {
650 // Build a frame on this thread, ship it to a worker thread,
651 // read its bytes there. This is the use case the module
652 // exists to enable; if it ever stops compiling, the
653 // `Send + Sync` impls above are wrong.
654 let pool = small_pool(1, 256);
655 let frame = build_simple_frame(&pool);
656 let frame_for_worker = Arc::clone(&frame);
657 let handle = std::thread::spawn(move || {
658 let plane = frame_for_worker.plane(0).expect("plane 0 on worker");
659 let mut sum: u32 = 0;
660 for b in plane {
661 sum += *b as u32;
662 }
663 sum
664 });
665 let sum = handle.join().expect("worker joined");
666 // Plane was filled with 0..16, sum = 120.
667 assert_eq!(sum, (0..16u32).sum::<u32>());
668 // Original frame still readable here too.
669 assert_eq!(frame.plane(0).unwrap().len(), 16);
670 }
671
672 #[test]
673 fn concurrent_alloc_produces_disjoint_slices() {
674 // Two threads alloc 64 bytes each from a 256-byte arena.
675 // Their slices must not overlap.
676 let pool = small_pool(1, 256);
677 let arena = Arc::new(pool.lease().unwrap());
678 let a = Arc::clone(&arena);
679 let b = Arc::clone(&arena);
680 let h1 = std::thread::spawn(move || {
681 let s: &mut [u8] = a.alloc::<u8>(64).unwrap();
682 // Fill so we can detect overlap from the other thread.
683 for x in s.iter_mut() {
684 *x = 0xAA;
685 }
686 (s.as_ptr() as usize, s.len())
687 });
688 let h2 = std::thread::spawn(move || {
689 let s: &mut [u8] = b.alloc::<u8>(64).unwrap();
690 for x in s.iter_mut() {
691 *x = 0xBB;
692 }
693 (s.as_ptr() as usize, s.len())
694 });
695 let (p1, l1) = h1.join().unwrap();
696 let (p2, l2) = h2.join().unwrap();
697 // Disjoint ranges: [p1, p1+l1) and [p2, p2+l2) do not overlap.
698 let no_overlap = p1 + l1 <= p2 || p2 + l2 <= p1;
699 assert!(no_overlap, "concurrent alloc returned overlapping slices");
700 }
701}