use crate::record::finalizer::{Finalizer, FinalizerStack};
use crate::runtime::region_heap::{HeapIndex, RegionHeap};
use crate::tracing_compat::{Span, debug, info_span};
use crate::types::rref::{RRef, RRefAccessWitness, RRefError};
use crate::types::{Budget, CancelReason, CurveBudget, RRefAccess, RegionId, TaskId, Time};
use parking_lot::RwLock;
use std::sync::atomic::{AtomicU8, Ordering};
#[derive(Debug)]
pub struct RegionCloseState {
pub closed: bool,
pub waker: Option<std::task::Waker>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionState {
Open,
Closing,
Draining,
Finalizing,
Closed,
}
impl RegionState {
#[must_use]
pub const fn as_u8(self) -> u8 {
match self {
Self::Open => 0,
Self::Closing => 1,
Self::Draining => 2,
Self::Finalizing => 3,
Self::Closed => 4,
}
}
#[must_use]
pub const fn from_u8(value: u8) -> Option<Self> {
match value {
0 => Some(Self::Open),
1 => Some(Self::Closing),
2 => Some(Self::Draining),
3 => Some(Self::Finalizing),
4 => Some(Self::Closed),
_ => None,
}
}
#[must_use]
pub const fn is_terminal(self) -> bool {
matches!(self, Self::Closed)
}
#[must_use]
pub const fn can_spawn(self) -> bool {
matches!(self, Self::Open)
}
#[must_use]
pub const fn can_accept_work(self) -> bool {
matches!(self, Self::Open | Self::Finalizing)
}
#[must_use]
pub const fn is_draining(self) -> bool {
matches!(self, Self::Draining)
}
#[must_use]
pub const fn is_closing(self) -> bool {
matches!(self, Self::Closing | Self::Draining | Self::Finalizing)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RegionLimits {
pub max_children: Option<usize>,
pub max_tasks: Option<usize>,
pub max_obligations: Option<usize>,
pub max_heap_bytes: Option<usize>,
pub curve_budget: Option<CurveBudget>,
}
impl RegionLimits {
pub const UNLIMITED: Self = Self {
max_children: None,
max_tasks: None,
max_obligations: None,
max_heap_bytes: None,
curve_budget: None,
};
#[must_use]
pub const fn unlimited() -> Self {
Self::UNLIMITED
}
#[must_use]
pub fn with_curve_budget(mut self, curve_budget: CurveBudget) -> Self {
self.curve_budget = Some(curve_budget);
self
}
#[must_use]
pub fn without_curve_budget(mut self) -> Self {
self.curve_budget = None;
self
}
}
impl Default for RegionLimits {
fn default() -> Self {
Self::UNLIMITED
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionKind {
Child,
Task,
Obligation,
HeapBytes,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionError {
Closed,
LimitReached {
kind: AdmissionKind,
limit: usize,
live: usize,
},
}
#[derive(Debug)]
pub struct AtomicRegionState {
inner: AtomicU8,
}
impl AtomicRegionState {
#[must_use]
pub fn new(state: RegionState) -> Self {
Self {
inner: AtomicU8::new(state.as_u8()),
}
}
#[must_use]
pub fn load(&self) -> RegionState {
RegionState::from_u8(self.inner.load(Ordering::Acquire)).expect("invalid region state")
}
pub fn store(&self, state: RegionState) {
self.inner.store(state.as_u8(), Ordering::Release);
}
pub fn transition(&self, from: RegionState, to: RegionState) -> bool {
self.inner
.compare_exchange(
from.as_u8(),
to.as_u8(),
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
}
}
#[derive(Debug)]
struct RegionInner {
budget: Budget,
children: Vec<RegionId>,
tasks: Vec<TaskId>,
finalizers: FinalizerStack,
cancel_reason: Option<CancelReason>,
limits: RegionLimits,
pending_obligations: usize,
heap: RegionHeap,
}
#[derive(Debug)]
pub struct RegionRecord {
pub id: RegionId,
pub parent: Option<RegionId>,
pub created_at: Time,
pub close_notify: std::sync::Arc<parking_lot::Mutex<RegionCloseState>>,
state: AtomicRegionState,
inner: RwLock<RegionInner>,
#[cfg(feature = "tracing-integration")]
span: Span,
#[cfg(not(feature = "tracing-integration"))]
span: Span,
}
impl RegionRecord {
#[must_use]
pub fn new(id: RegionId, parent: Option<RegionId>, budget: Budget) -> Self {
Self::new_with_time(id, parent, budget, Time::ZERO)
}
#[must_use]
pub fn new_with_time(
id: RegionId,
parent: Option<RegionId>,
budget: Budget,
created_at: Time,
) -> Self {
let span = info_span!(
"region",
region_id = ?id,
parent_region_id = ?parent,
state = "Open",
initial_budget_deadline = ?budget.deadline,
initial_budget_poll_quota = budget.poll_quota,
);
debug!(
parent: &span,
region_id = ?id,
parent_region_id = ?parent,
state = "Open",
budget_deadline = ?budget.deadline,
budget_poll_quota = budget.poll_quota,
"region created"
);
Self {
id,
parent,
created_at,
close_notify: std::sync::Arc::new(parking_lot::Mutex::new(RegionCloseState {
closed: false,
waker: None,
})),
state: AtomicRegionState::new(RegionState::Open),
inner: RwLock::new(RegionInner {
budget,
children: Vec::new(),
tasks: Vec::new(),
finalizers: FinalizerStack::new(),
cancel_reason: None,
limits: RegionLimits::UNLIMITED,
pending_obligations: 0,
heap: RegionHeap::new(),
}),
span,
}
}
#[must_use]
pub const fn created_at(&self) -> Time {
self.created_at
}
#[inline]
#[must_use]
pub fn state(&self) -> RegionState {
self.state.load()
}
#[inline]
#[must_use]
pub fn budget(&self) -> Budget {
self.inner.read().budget
}
pub fn set_budget(&self, budget: Budget) {
self.inner.write().budget = budget;
}
#[inline]
#[must_use]
pub fn limits(&self) -> RegionLimits {
self.inner.read().limits.clone()
}
pub fn set_limits(&self, limits: RegionLimits) {
self.inner.write().limits = limits;
}
#[inline]
#[must_use]
pub fn pending_obligations(&self) -> usize {
self.inner.read().pending_obligations
}
#[must_use]
pub fn cancel_reason(&self) -> Option<CancelReason> {
self.inner.read().cancel_reason.clone()
}
pub fn strengthen_cancel_reason(&self, reason: CancelReason) {
let mut inner = self.inner.write();
if let Some(existing) = &mut inner.cancel_reason {
existing.strengthen(&reason);
} else {
inner.cancel_reason = Some(reason);
}
}
#[inline]
#[must_use]
pub fn child_count(&self) -> usize {
self.inner.read().children.len()
}
#[must_use]
pub fn child_ids(&self) -> Vec<RegionId> {
self.inner.read().children.clone()
}
#[inline]
pub fn copy_child_ids_into(&self, buf: &mut Vec<RegionId>) {
let inner = self.inner.read();
buf.extend_from_slice(&inner.children);
}
#[inline]
#[must_use]
pub fn task_count(&self) -> usize {
self.inner.read().tasks.len()
}
#[must_use]
pub fn task_ids(&self) -> Vec<TaskId> {
self.inner.read().tasks.clone()
}
#[inline]
pub fn copy_task_ids_into(&self, buf: &mut Vec<TaskId>) {
let inner = self.inner.read();
buf.extend_from_slice(&inner.tasks);
}
#[inline]
#[must_use]
pub fn task_ids_small(&self) -> smallvec::SmallVec<[TaskId; 8]> {
let inner = self.inner.read();
smallvec::SmallVec::from_slice(&inner.tasks)
}
#[inline]
#[must_use]
pub fn has_live_work(&self) -> bool {
let inner = self.inner.read();
!inner.children.is_empty() || !inner.tasks.is_empty() || inner.pending_obligations > 0
}
pub fn add_child(&self, child: RegionId) -> Result<(), AdmissionError> {
if !self.state.load().can_spawn() {
return Err(AdmissionError::Closed);
}
let mut inner = self.inner.write();
if !self.state.load().can_spawn() {
return Err(AdmissionError::Closed);
}
if inner.children.contains(&child) {
return Ok(());
}
if let Some(limit) = inner.limits.max_children {
if inner.children.len() >= limit {
return Err(AdmissionError::LimitReached {
kind: AdmissionKind::Child,
limit,
live: inner.children.len(),
});
}
}
inner.children.push(child);
drop(inner);
Ok(())
}
pub fn remove_child(&self, child: RegionId) {
let mut inner = self.inner.write();
inner.children.retain(|&c| c != child);
}
fn add_task_internal(
&self,
task: TaskId,
bypass_task_limit_in_finalizing: bool,
) -> Result<(), AdmissionError> {
let state = self.state.load();
if !state.can_accept_work() {
return Err(AdmissionError::Closed);
}
let mut inner = self.inner.write();
let state = self.state.load();
if !state.can_accept_work() {
return Err(AdmissionError::Closed);
}
if inner.tasks.contains(&task) {
return Ok(());
}
let bypass_limit = bypass_task_limit_in_finalizing && state == RegionState::Finalizing;
if !bypass_limit {
if let Some(limit) = inner.limits.max_tasks {
if inner.tasks.len() >= limit {
return Err(AdmissionError::LimitReached {
kind: AdmissionKind::Task,
limit,
live: inner.tasks.len(),
});
}
}
}
inner.tasks.push(task);
drop(inner);
Ok(())
}
pub fn add_task(&self, task: TaskId) -> Result<(), AdmissionError> {
self.add_task_internal(task, false)
}
pub fn add_cleanup_task(&self, task: TaskId) -> Result<(), AdmissionError> {
self.add_task_internal(task, true)
}
pub fn remove_task(&self, task: TaskId) {
let mut inner = self.inner.write();
inner.tasks.retain(|&t| t != task);
}
pub fn try_reserve_obligation(&self) -> Result<(), AdmissionError> {
if !self.state.load().can_accept_work() {
return Err(AdmissionError::Closed);
}
let mut inner = self.inner.write();
if !self.state.load().can_accept_work() {
return Err(AdmissionError::Closed);
}
if let Some(limit) = inner.limits.max_obligations {
if inner.pending_obligations >= limit {
return Err(AdmissionError::LimitReached {
kind: AdmissionKind::Obligation,
limit,
live: inner.pending_obligations,
});
}
}
inner.pending_obligations += 1;
drop(inner);
Ok(())
}
pub fn resolve_obligation(&self) {
let mut inner = self.inner.write();
inner.pending_obligations = inner.pending_obligations.saturating_sub(1);
}
pub fn add_finalizer(&self, finalizer: Finalizer) {
let mut inner = self.inner.write();
inner.finalizers.push(finalizer);
}
pub fn pop_finalizer(&self) -> Option<Finalizer> {
let mut inner = self.inner.write();
inner.finalizers.pop()
}
#[must_use]
pub fn finalizer_count(&self) -> usize {
self.inner.read().finalizers.len()
}
#[must_use]
pub fn finalizers_empty(&self) -> bool {
self.inner.read().finalizers.is_empty()
}
pub fn heap_alloc<T: Send + Sync + 'static>(
&self,
value: T,
) -> Result<HeapIndex, AdmissionError> {
if self.state().is_terminal() {
return Err(AdmissionError::Closed);
}
let size_hint = std::mem::size_of::<T>();
let mut inner = self.inner.write();
if self.state().is_terminal() {
return Err(AdmissionError::Closed);
}
if let Some(limit) = inner.limits.max_heap_bytes {
let live_bytes = inner.heap.stats().bytes_live;
let requested = live_bytes.saturating_add(size_hint as u64);
if requested > limit as u64 {
let live = usize::try_from(live_bytes).unwrap_or(usize::MAX);
return Err(AdmissionError::LimitReached {
kind: AdmissionKind::HeapBytes,
limit,
live,
});
}
}
Ok(inner.heap.alloc(value))
}
#[must_use]
pub fn heap_get<T>(&self, index: HeapIndex) -> Option<T>
where
T: Clone + 'static,
{
let inner = self.inner.read();
inner.heap.get::<T>(index).cloned()
}
pub fn heap_with<T: 'static, R, F: FnOnce(&T) -> R>(
&self,
index: HeapIndex,
f: F,
) -> Option<R> {
let inner = self.inner.read();
inner.heap.get::<T>(index).map(f)
}
#[must_use]
pub fn heap_len(&self) -> usize {
self.inner.read().heap.len()
}
#[must_use]
pub fn heap_stats(&self) -> crate::runtime::region_heap::HeapStats {
self.inner.read().heap.stats()
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
let inner = self.inner.read();
inner.children.is_empty()
&& inner.tasks.is_empty()
&& inner.pending_obligations == 0
&& inner.finalizers.is_empty()
}
pub fn cancel_request(&self, reason: CancelReason) -> bool {
let mut inner = self.inner.write();
if let Some(existing) = &mut inner.cancel_reason {
existing.strengthen(&reason);
false
} else {
inner.cancel_reason = Some(reason);
true
}
}
pub fn begin_close(&self, reason: Option<CancelReason>) -> bool {
{
let mut inner = self.inner.write();
if self.state.load() == RegionState::Closed {
return false;
}
if let Some(reason) = reason {
if let Some(existing) = &mut inner.cancel_reason {
existing.strengthen(&reason);
} else {
inner.cancel_reason = Some(reason);
}
}
}
let transitioned = self
.state
.transition(RegionState::Open, RegionState::Closing);
if transitioned {
self.trace_state_change(RegionState::Closing);
}
transitioned
}
pub fn begin_drain(&self) -> bool {
let transitioned = self
.state
.transition(RegionState::Closing, RegionState::Draining);
if transitioned {
self.trace_state_change(RegionState::Draining);
}
transitioned
}
pub fn begin_finalize(&self) -> bool {
let transitioned = self
.state
.transition(RegionState::Closing, RegionState::Finalizing)
|| self
.state
.transition(RegionState::Draining, RegionState::Finalizing);
if transitioned {
self.trace_state_change(RegionState::Finalizing);
}
transitioned
}
pub fn complete_close(&self) -> bool {
let mut inner = self.inner.write();
if !(inner.children.is_empty()
&& inner.tasks.is_empty()
&& inner.pending_obligations == 0
&& inner.finalizers.is_empty())
{
return false;
}
let transitioned = self
.state
.transition(RegionState::Finalizing, RegionState::Closed);
if transitioned {
self.trace_state_change(RegionState::Closed);
inner.heap.reclaim_all();
let waker = {
let mut notify = self.close_notify.lock();
notify.closed = true;
notify.waker.take()
};
drop(inner);
if let Some(waker) = waker {
waker.wake();
}
}
transitioned
}
pub fn set_state(&self, state: RegionState) {
self.state.store(state);
self.trace_state_change(state);
}
fn trace_state_change(&self, new_state: RegionState) {
let state_name = match new_state {
RegionState::Open => "Open",
RegionState::Closing => "Closing",
RegionState::Draining => "Draining",
RegionState::Finalizing => "Finalizing",
RegionState::Closed => "Closed",
};
debug!(
parent: &self.span,
region_id = ?self.id,
state = state_name,
"region state transition"
);
self.span.record("state", state_name);
}
fn clear_heap(&self) {
let mut inner = self.inner.write();
inner.heap.reclaim_all();
}
pub fn rref_get<T: Clone + 'static>(&self, rref: &RRef<T>) -> Result<T, RRefError> {
if rref.region_id() != self.id {
return Err(RRefError::RegionMismatch {
expected: rref.region_id(),
actual: self.id,
});
}
if self.state().is_terminal() {
return Err(RRefError::RegionClosed);
}
let inner = self.inner.read();
inner
.heap
.get::<T>(rref.heap_index())
.cloned()
.ok_or(RRefError::AllocationInvalid)
}
pub fn rref_with<T: 'static, R, F: FnOnce(&T) -> R>(
&self,
rref: &RRef<T>,
f: F,
) -> Result<R, RRefError> {
if rref.region_id() != self.id {
return Err(RRefError::RegionMismatch {
expected: rref.region_id(),
actual: self.id,
});
}
if self.state().is_terminal() {
return Err(RRefError::RegionClosed);
}
let inner = self.inner.read();
inner
.heap
.get::<T>(rref.heap_index())
.map(f)
.ok_or(RRefError::AllocationInvalid)
}
pub fn access_witness(&self) -> Result<RRefAccessWitness, RRefError> {
if self.state().is_terminal() {
return Err(RRefError::RegionClosed);
}
Ok(RRefAccessWitness::new(self.id))
}
pub fn rref_get_with<T: Clone + 'static>(
&self,
rref: &RRef<T>,
witness: RRefAccessWitness,
) -> Result<T, RRefError> {
if witness.region() != self.id {
return Err(RRefError::WrongRegion);
}
rref.validate_witness(&witness)?;
if self.state().is_terminal() {
return Err(RRefError::RegionClosed);
}
let inner = self.inner.read();
inner
.heap
.get::<T>(rref.heap_index())
.cloned()
.ok_or(RRefError::AllocationInvalid)
}
pub fn rref_with_witness<T: 'static, R, F: FnOnce(&T) -> R>(
&self,
rref: &RRef<T>,
witness: RRefAccessWitness,
f: F,
) -> Result<R, RRefError> {
if witness.region() != self.id {
return Err(RRefError::WrongRegion);
}
rref.validate_witness(&witness)?;
if self.state().is_terminal() {
return Err(RRefError::RegionClosed);
}
let inner = self.inner.read();
inner
.heap
.get::<T>(rref.heap_index())
.map(f)
.ok_or(RRefError::AllocationInvalid)
}
#[must_use]
pub fn should_begin_close(&self) -> bool {
let state = self.state();
matches!(state, RegionState::Open)
}
#[must_use]
pub fn should_begin_drain(&self) -> bool {
let state = self.state();
state == RegionState::Closing
}
#[must_use]
pub fn can_finalize(&self) -> bool {
let state = self.state();
matches!(state, RegionState::Closing | RegionState::Draining)
}
#[must_use]
pub fn can_complete_close(&self) -> bool {
let state = self.state();
state == RegionState::Finalizing
}
#[must_use]
pub fn children_closed(&self, closed: &dyn Fn(RegionId) -> bool) -> bool {
let inner = self.inner.read();
inner.children.iter().all(|child| closed(*child))
}
#[must_use]
pub fn tasks_completed(&self, completed: &dyn Fn(TaskId) -> bool) -> bool {
let inner = self.inner.read();
inner.tasks.iter().all(|task| completed(*task))
}
#[must_use]
pub fn obligations_resolved(&self) -> bool {
self.pending_obligations() == 0
}
#[must_use]
pub fn ready_to_finalize(&self, completed: &dyn Fn(TaskId) -> bool) -> bool {
let inner = self.inner.read();
inner.children.is_empty()
&& inner.tasks.iter().all(|task| completed(*task))
&& inner.pending_obligations == 0
}
pub fn apply_distributed_snapshot(
&self,
state: RegionState,
budget: Budget,
children: Vec<RegionId>,
tasks: Vec<TaskId>,
cancel_reason: Option<CancelReason>,
) {
let prev_state = self.state.load();
let mut inner = self.inner.write();
inner.budget = budget;
inner.children = children;
inner.tasks = tasks;
inner.cancel_reason = cancel_reason;
self.state.store(state);
drop(inner);
if state == RegionState::Closed && prev_state != RegionState::Closed {
self.clear_heap();
let waker = {
let mut notify = self.close_notify.lock();
notify.closed = true;
notify.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
}
}
}
impl RRefAccess for RegionRecord {
fn rref_get<T: Clone + 'static>(&self, rref: &RRef<T>) -> Result<T, RRefError> {
self.rref_get(rref)
}
fn rref_with<T: 'static, R, F: FnOnce(&T) -> R>(
&self,
rref: &RRef<T>,
f: F,
) -> Result<R, RRefError> {
self.rref_with(rref, f)
}
fn rref_get_with<T: Clone + 'static>(
&self,
rref: &RRef<T>,
witness: RRefAccessWitness,
) -> Result<T, RRefError> {
self.rref_get_with(rref, witness)
}
fn rref_with_witness<T: 'static, R, F: FnOnce(&T) -> R>(
&self,
rref: &RRef<T>,
witness: RRefAccessWitness,
f: F,
) -> Result<R, RRefError> {
self.rref_with_witness(rref, witness, f)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::finalizer::Finalizer;
use crate::util::ArenaIndex;
use parking_lot::Mutex;
fn test_region_id() -> RegionId {
RegionId::from_arena(ArenaIndex::new(1, 0))
}
fn rref_get_via_trait<A: RRefAccess, T: Clone + 'static>(accessor: &A, rref: &RRef<T>) -> T {
accessor.rref_get(rref).expect("trait get")
}
fn rref_with_via_trait<A: RRefAccess, T: 'static, R, F: FnOnce(&T) -> R>(
accessor: &A,
rref: &RRef<T>,
f: F,
) -> R {
accessor.rref_with(rref, f).expect("trait with")
}
#[test]
fn ready_to_finalize_requires_no_children() {
let region = RegionRecord::new(test_region_id(), None, Budget::INFINITE);
region
.add_child(RegionId::from_arena(ArenaIndex::new(2, 0)))
.expect("add child");
region
.add_task(TaskId::from_arena(ArenaIndex::new(3, 0)))
.expect("add task");
region.resolve_obligation();
assert!(!region.ready_to_finalize(&|_task| true));
region.remove_child(RegionId::from_arena(ArenaIndex::new(2, 0)));
assert!(region.ready_to_finalize(&|_task| true));
}
fn rref_get_with_via_trait<A: RRefAccess, T: Clone + 'static>(
accessor: &A,
rref: &RRef<T>,
witness: RRefAccessWitness,
) -> T {
accessor
.rref_get_with(rref, witness)
.expect("trait get_with")
}
fn rref_with_witness_via_trait<A: RRefAccess, T: 'static, R, F: FnOnce(&T) -> R>(
accessor: &A,
rref: &RRef<T>,
witness: RRefAccessWitness,
f: F,
) -> R {
accessor
.rref_with_witness(rref, witness, f)
.expect("trait with_witness")
}
#[test]
fn region_initial_state() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert_eq!(region.state(), RegionState::Open);
}
#[test]
fn region_state_transitions() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(region.begin_close(None));
assert_eq!(region.state(), RegionState::Closing);
assert!(region.begin_drain());
assert_eq!(region.state(), RegionState::Draining);
assert!(region.begin_finalize());
assert_eq!(region.state(), RegionState::Finalizing);
assert!(region.complete_close());
assert_eq!(region.state(), RegionState::Closed);
}
#[test]
fn region_state_invalid_transitions() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(!region.begin_drain());
assert_eq!(region.state(), RegionState::Open);
assert!(!region.begin_finalize());
assert_eq!(region.state(), RegionState::Open);
assert!(!region.complete_close());
assert_eq!(region.state(), RegionState::Open);
}
#[test]
fn region_admission_limits() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_children: Some(1),
max_tasks: Some(2),
max_obligations: Some(1),
max_heap_bytes: None,
curve_budget: None,
});
assert!(
region
.add_child(RegionId::from_arena(ArenaIndex::new(2, 0)))
.is_ok()
);
assert!(
region
.add_child(RegionId::from_arena(ArenaIndex::new(3, 0)))
.is_err()
);
assert!(
region
.add_task(TaskId::from_arena(ArenaIndex::new(1, 0)))
.is_ok()
);
assert!(
region
.add_task(TaskId::from_arena(ArenaIndex::new(2, 0)))
.is_ok()
);
assert!(
region
.add_task(TaskId::from_arena(ArenaIndex::new(3, 0)))
.is_err()
);
assert!(region.try_reserve_obligation().is_ok());
assert!(region.try_reserve_obligation().is_err());
}
#[test]
fn region_obligation_tracking() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert_eq!(region.pending_obligations(), 0);
assert!(region.try_reserve_obligation().is_ok());
assert_eq!(region.pending_obligations(), 1);
region.resolve_obligation();
assert_eq!(region.pending_obligations(), 0);
}
#[test]
fn region_obligation_limit_released_after_resolve() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_obligations: Some(1),
..RegionLimits::unlimited()
});
assert!(region.try_reserve_obligation().is_ok());
assert!(matches!(
region.try_reserve_obligation(),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::Obligation,
..
})
));
assert_eq!(region.pending_obligations(), 1);
region.resolve_obligation();
assert_eq!(region.pending_obligations(), 0);
assert!(region.try_reserve_obligation().is_ok());
}
#[test]
fn region_quiescence() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(region.is_quiescent());
region
.add_child(RegionId::from_arena(ArenaIndex::new(2, 0)))
.expect("add child");
assert!(!region.is_quiescent());
region.remove_child(RegionId::from_arena(ArenaIndex::new(2, 0)));
assert!(region.is_quiescent());
}
#[test]
fn region_finalizer_stack() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let log = std::sync::Arc::new(Mutex::new(Vec::new()));
region.add_finalizer(Finalizer::Sync(Box::new({
let log_ref = log.clone();
move || log_ref.lock().push("first")
})));
region.add_finalizer(Finalizer::Sync(Box::new({
let log_ref = log.clone();
move || log_ref.lock().push("second")
})));
while let Some(finalizer) = region.pop_finalizer() {
if let Finalizer::Sync(f) = finalizer {
f();
}
}
let log = log.lock().clone();
assert_eq!(log, vec!["second", "first"]); }
#[test]
fn region_heap_alloc_and_access() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let idx = region.heap_alloc(42u32).expect("heap alloc");
let value = region.heap_get::<u32>(idx).expect("heap get");
assert_eq!(value, 42);
let doubled = region.heap_with(idx, |v: &u32| v * 2).expect("heap with");
assert_eq!(doubled, 84);
}
#[test]
fn region_heap_bytes_limit_enforced() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let limit = std::mem::size_of::<u32>();
region.set_limits(RegionLimits {
max_heap_bytes: Some(limit),
..RegionLimits::unlimited()
});
let _idx = region.heap_alloc(7u32).expect("heap alloc");
let err = region.heap_alloc(1u8).expect_err("heap limit enforced");
assert!(matches!(
err,
AdmissionError::LimitReached {
kind: AdmissionKind::HeapBytes,
limit: _,
live: _
}
));
}
#[test]
fn begin_close_with_reason() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let reason = CancelReason::user("test shutdown");
assert!(region.begin_close(Some(reason.clone())));
assert_eq!(region.cancel_reason(), Some(reason));
}
#[test]
fn begin_close_on_closed_region_preserves_terminal_cancel_reason() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let initial_reason = CancelReason::timeout();
assert!(region.begin_close(Some(initial_reason.clone())));
assert!(region.begin_finalize());
assert!(region.complete_close());
assert_eq!(region.cancel_reason(), Some(initial_reason.clone()));
assert!(!region.begin_close(Some(CancelReason::resource_unavailable())));
assert_eq!(region.cancel_reason(), Some(initial_reason));
}
#[test]
fn region_heap_reclaimed_on_close() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let _idx = region.heap_alloc(42u32).expect("heap alloc");
assert_eq!(region.heap_len(), 1);
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
assert_eq!(region.heap_len(), 0);
}
#[test]
fn rref_invalid_after_close() {
let region_id = test_region_id();
let region = RegionRecord::new(region_id, None, Budget::default());
let index = region.heap_alloc(123u32).expect("heap alloc");
let rref = RRef::<u32>::new(region_id, index);
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
let err = region
.rref_get(&rref)
.expect_err("rref should be invalid after close");
assert_eq!(err, RRefError::RegionClosed);
}
#[test]
fn invalid_state_transitions_are_rejected() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(!region.begin_drain());
assert_eq!(region.state(), RegionState::Open);
assert!(!region.begin_finalize());
assert_eq!(region.state(), RegionState::Open);
assert!(!region.complete_close());
assert_eq!(region.state(), RegionState::Open);
region.begin_close(None);
region.begin_drain();
assert!(!region.complete_close());
assert_eq!(region.state(), RegionState::Draining);
}
#[test]
fn finalizer_registration() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(region.finalizers_empty());
assert_eq!(region.finalizer_count(), 0);
region.add_finalizer(Finalizer::Sync(Box::new(|| {})));
assert!(!region.finalizers_empty());
assert_eq!(region.finalizer_count(), 1);
region.add_finalizer(Finalizer::Async(Box::pin(async {})));
assert_eq!(region.finalizer_count(), 2);
}
#[test]
fn finalizer_lifo_order() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let order = std::sync::Arc::new(Mutex::new(Vec::new()));
let o1 = order.clone();
let o2 = order.clone();
let o3 = order.clone();
region.add_finalizer(Finalizer::Sync(Box::new(move || {
o1.lock().push(1);
})));
region.add_finalizer(Finalizer::Sync(Box::new(move || {
o2.lock().push(2);
})));
region.add_finalizer(Finalizer::Sync(Box::new(move || {
o3.lock().push(3);
})));
while let Some(finalizer) = region.pop_finalizer() {
if let Finalizer::Sync(f) = finalizer {
f();
}
}
assert_eq!(*order.lock(), vec![3, 2, 1]);
}
#[test]
fn finalizer_pop_returns_none_when_empty() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(region.pop_finalizer().is_none());
region.add_finalizer(Finalizer::Sync(Box::new(|| {})));
assert!(region.pop_finalizer().is_some());
assert!(region.pop_finalizer().is_none());
}
#[test]
fn admission_rejected_when_closing() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.begin_close(None);
assert_eq!(region.state(), RegionState::Closing);
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_task(task), Err(AdmissionError::Closed));
let child = RegionId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_child(child), Err(AdmissionError::Closed));
assert_eq!(region.try_reserve_obligation(), Err(AdmissionError::Closed));
}
#[test]
fn admission_rejected_when_draining() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.begin_close(None);
region.begin_drain();
assert_eq!(region.state(), RegionState::Draining);
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_task(task), Err(AdmissionError::Closed));
}
#[test]
fn admission_allowed_when_finalizing() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.begin_close(None);
assert!(region.begin_finalize()); assert_eq!(region.state(), RegionState::Finalizing);
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_task(task).is_ok());
assert!(region.try_reserve_obligation().is_ok());
}
#[test]
fn cleanup_task_bypasses_task_limit_when_finalizing() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(0),
..RegionLimits::unlimited()
});
region.begin_close(None);
assert!(region.begin_finalize());
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(
region.add_task(task),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::Task,
limit: 0,
live: 0,
})
);
assert!(region.add_cleanup_task(task).is_ok());
}
#[test]
fn child_admission_rejected_when_finalizing() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.begin_close(None);
assert!(region.begin_finalize()); assert_eq!(region.state(), RegionState::Finalizing);
let child = RegionId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_child(child), Err(AdmissionError::Closed));
}
#[test]
fn admission_rejected_when_closed() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.begin_close(None);
region.begin_finalize();
region.complete_close();
assert_eq!(region.state(), RegionState::Closed);
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_task(task), Err(AdmissionError::Closed));
}
#[test]
fn add_task_idempotent() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(1),
..RegionLimits::unlimited()
});
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_task(task).is_ok());
assert!(region.add_task(task).is_ok());
assert_eq!(region.task_ids().len(), 1);
}
#[test]
fn add_child_idempotent() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_children: Some(1),
..RegionLimits::unlimited()
});
let child = RegionId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_child(child).is_ok());
assert!(region.add_child(child).is_ok());
assert_eq!(region.child_ids().len(), 1);
}
#[test]
fn remove_task_frees_slot() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(1),
..RegionLimits::unlimited()
});
let task1 = TaskId::from_arena(ArenaIndex::new(1, 0));
let task2 = TaskId::from_arena(ArenaIndex::new(2, 0));
assert!(region.add_task(task1).is_ok());
assert!(region.add_task(task2).is_err());
region.remove_task(task1);
assert!(region.add_task(task2).is_ok());
}
#[test]
fn remove_child_frees_slot() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_children: Some(1),
..RegionLimits::unlimited()
});
let child1 = RegionId::from_arena(ArenaIndex::new(1, 0));
let child2 = RegionId::from_arena(ArenaIndex::new(2, 0));
assert!(region.add_child(child1).is_ok());
assert!(region.add_child(child2).is_err());
region.remove_child(child1);
assert!(region.add_child(child2).is_ok());
}
#[test]
fn unlimited_admits_many_tasks() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert_eq!(region.limits(), RegionLimits::UNLIMITED);
for i in 0..100 {
let task = TaskId::from_arena(ArenaIndex::new(i, 0));
assert!(region.add_task(task).is_ok());
}
assert_eq!(region.task_ids().len(), 100);
}
#[test]
fn unlimited_admits_many_obligations() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
for _ in 0..100 {
assert!(region.try_reserve_obligation().is_ok());
}
assert_eq!(region.pending_obligations(), 100);
}
#[test]
fn resolve_obligation_saturates_at_zero() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert_eq!(region.pending_obligations(), 0);
region.resolve_obligation();
assert_eq!(region.pending_obligations(), 0);
assert!(region.try_reserve_obligation().is_ok());
assert_eq!(region.pending_obligations(), 1);
region.resolve_obligation();
assert_eq!(region.pending_obligations(), 0);
region.resolve_obligation();
assert_eq!(region.pending_obligations(), 0);
}
#[test]
fn admission_error_carries_exact_counts() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(3),
..RegionLimits::unlimited()
});
for i in 0..3 {
let task = TaskId::from_arena(ArenaIndex::new(i, 0));
assert!(region.add_task(task).is_ok());
}
let overflow_task = TaskId::from_arena(ArenaIndex::new(99, 0));
let err = region
.add_task(overflow_task)
.expect_err("expected admission error");
match err {
AdmissionError::LimitReached { kind, limit, live } => {
assert_eq!(kind, AdmissionKind::Task);
assert_eq!(limit, 3);
assert_eq!(live, 3);
}
AdmissionError::Closed => unreachable!("expected LimitReached, got Closed"),
}
}
#[test]
fn has_live_work_tracks_all_categories() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(!region.has_live_work());
let task = TaskId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_task(task).is_ok());
assert!(region.has_live_work());
region.remove_task(task);
assert!(!region.has_live_work());
let child = RegionId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_child(child).is_ok());
assert!(region.has_live_work());
region.remove_child(child);
assert!(!region.has_live_work());
assert!(region.try_reserve_obligation().is_ok());
assert!(region.has_live_work());
region.resolve_obligation();
assert!(!region.has_live_work());
}
#[test]
fn heap_admits_up_to_exact_limit() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let u32_size = std::mem::size_of::<u32>();
region.set_limits(RegionLimits {
max_heap_bytes: Some(u32_size * 2),
..RegionLimits::unlimited()
});
assert!(region.heap_alloc(1u32).is_ok());
assert!(region.heap_alloc(2u32).is_ok());
let err = region.heap_alloc(3u32).expect_err("heap limit");
assert!(matches!(
err,
AdmissionError::LimitReached {
kind: AdmissionKind::HeapBytes,
..
}
));
}
#[test]
fn close_prevents_subsequent_admission() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let task1 = TaskId::from_arena(ArenaIndex::new(1, 0));
assert!(region.add_task(task1).is_ok());
region.begin_close(None);
let task2 = TaskId::from_arena(ArenaIndex::new(2, 0));
assert_eq!(region.add_task(task2), Err(AdmissionError::Closed));
let child = RegionId::from_arena(ArenaIndex::new(1, 0));
assert_eq!(region.add_child(child), Err(AdmissionError::Closed));
assert_eq!(region.try_reserve_obligation(), Err(AdmissionError::Closed));
}
#[test]
fn saturated_limit_rejects_all_types() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(2),
max_children: Some(2),
max_obligations: Some(2),
max_heap_bytes: Some(std::mem::size_of::<u64>()),
curve_budget: None,
});
let t1 = TaskId::from_arena(ArenaIndex::new(1, 0));
let t2 = TaskId::from_arena(ArenaIndex::new(2, 0));
assert!(region.add_task(t1).is_ok());
assert!(region.add_task(t2).is_ok());
let c1 = RegionId::from_arena(ArenaIndex::new(1, 0));
let c2 = RegionId::from_arena(ArenaIndex::new(2, 0));
assert!(region.add_child(c1).is_ok());
assert!(region.add_child(c2).is_ok());
assert!(region.try_reserve_obligation().is_ok());
assert!(region.try_reserve_obligation().is_ok());
assert!(region.heap_alloc(42u64).is_ok());
let t3 = TaskId::from_arena(ArenaIndex::new(3, 0));
assert!(matches!(
region.add_task(t3),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::Task,
..
})
));
let c3 = RegionId::from_arena(ArenaIndex::new(3, 0));
assert!(matches!(
region.add_child(c3),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::Child,
..
})
));
assert!(matches!(
region.try_reserve_obligation(),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::Obligation,
..
})
));
assert!(matches!(
region.heap_alloc(1u8),
Err(AdmissionError::LimitReached {
kind: AdmissionKind::HeapBytes,
..
})
));
}
#[test]
fn heap_stats_return_to_zero_single_region() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.heap_alloc(1u32).unwrap();
region.heap_alloc(2u64).unwrap();
region.heap_alloc("hello".to_string()).unwrap();
assert_eq!(region.heap_stats().live, 3);
assert_eq!(region.heap_stats().allocations, 3);
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
assert_eq!(region.heap_stats().live, 0);
assert_eq!(region.heap_stats().reclaimed, 3);
assert_eq!(region.heap_len(), 0);
}
#[test]
fn multi_region_hierarchy_reclamation() {
let parent_id = RegionId::from_arena(ArenaIndex::new(100, 0));
let child1_id = RegionId::from_arena(ArenaIndex::new(101, 0));
let child2_id = RegionId::from_arena(ArenaIndex::new(102, 0));
let parent = RegionRecord::new(parent_id, None, Budget::default());
let child1 = RegionRecord::new(child1_id, Some(parent_id), Budget::default());
let child2 = RegionRecord::new(child2_id, Some(parent_id), Budget::default());
parent.add_child(child1_id).unwrap();
parent.add_child(child2_id).unwrap();
parent.heap_alloc(10u32).unwrap();
parent.heap_alloc(20u32).unwrap();
child1.heap_alloc(30u64).unwrap();
child2.heap_alloc(40u64).unwrap();
child2.heap_alloc(50u64).unwrap();
assert_eq!(parent.heap_stats().live, 2);
assert_eq!(child1.heap_stats().live, 1);
assert_eq!(child2.heap_stats().live, 2);
assert!(child1.begin_close(None));
assert!(child1.begin_finalize());
assert!(child1.complete_close());
assert_eq!(child1.heap_stats().live, 0);
assert_eq!(child1.heap_stats().reclaimed, 1);
assert!(child2.begin_close(None));
assert!(child2.begin_finalize());
assert!(child2.complete_close());
assert_eq!(child2.heap_stats().live, 0);
assert_eq!(child2.heap_stats().reclaimed, 2);
assert_eq!(parent.heap_stats().live, 2);
parent.remove_child(child1_id);
parent.remove_child(child2_id);
assert!(parent.begin_close(None));
assert!(parent.begin_finalize());
assert!(parent.complete_close());
assert_eq!(parent.heap_stats().live, 0);
assert_eq!(parent.heap_stats().reclaimed, 2);
}
#[test]
fn heap_alloc_allowed_during_cleanup_phases() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.heap_alloc(42u32).unwrap();
assert_eq!(region.heap_len(), 1);
assert!(region.begin_close(None));
region.heap_alloc(99u32).unwrap();
assert_eq!(region.heap_len(), 2);
assert!(region.begin_finalize());
region.heap_alloc(200u32).unwrap();
assert_eq!(region.heap_len(), 3);
assert!(region.complete_close());
assert_eq!(region.heap_len(), 0);
}
#[test]
fn heap_alloc_rejected_when_closed() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
assert_eq!(region.heap_alloc(42u32), Err(AdmissionError::Closed));
}
#[test]
fn heap_reclamation_timing_matches_state_machine() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.heap_alloc(1u32).unwrap();
region.heap_alloc(2u64).unwrap();
assert!(region.begin_close(None));
assert_eq!(region.heap_len(), 2);
assert_eq!(region.state(), RegionState::Closing);
assert!(region.begin_drain());
assert_eq!(region.heap_len(), 2);
assert_eq!(region.state(), RegionState::Draining);
assert!(region.begin_finalize());
assert_eq!(region.heap_len(), 2);
assert_eq!(region.state(), RegionState::Finalizing);
assert!(region.complete_close());
assert_eq!(region.heap_len(), 0);
assert_eq!(region.state(), RegionState::Closed);
}
#[test]
fn heap_stats_consistent_through_lifecycle() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.heap_alloc(42u32).unwrap();
region.heap_alloc(std::f64::consts::PI).unwrap();
region.heap_alloc(vec![1u8, 2, 3]).unwrap();
let stats_before = region.heap_stats();
assert_eq!(stats_before.allocations, 3);
assert_eq!(stats_before.live, 3);
assert_eq!(stats_before.reclaimed, 0);
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
let stats_after = region.heap_stats();
assert_eq!(stats_after.allocations, 3);
assert_eq!(stats_after.live, 0);
assert_eq!(stats_after.reclaimed, 3);
}
#[test]
fn rref_accessible_through_finalizing_invalid_after_closed() {
let region_id = test_region_id();
let region = RegionRecord::new(region_id, None, Budget::default());
let idx = region.heap_alloc(99u32).unwrap();
let rref = RRef::<u32>::new(region_id, idx);
assert_eq!(region.rref_get(&rref).unwrap(), 99);
assert!(region.begin_close(None));
assert_eq!(region.rref_get(&rref).unwrap(), 99);
assert!(region.begin_drain());
assert_eq!(region.rref_get(&rref).unwrap(), 99);
assert!(region.begin_finalize());
assert_eq!(region.rref_get(&rref).unwrap(), 99);
assert!(region.complete_close());
let err = region.rref_get(&rref).expect_err("invalid after close");
assert_eq!(err, RRefError::RegionClosed);
}
#[test]
fn complete_close_is_idempotent_for_reclamation() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.heap_alloc(1u32).unwrap();
region.heap_alloc(2u32).unwrap();
assert!(region.begin_close(None));
assert!(region.begin_finalize());
assert!(region.complete_close());
assert_eq!(region.heap_stats().live, 0);
assert_eq!(region.heap_stats().reclaimed, 2);
assert!(!region.complete_close());
assert_eq!(region.heap_stats().reclaimed, 2);
}
#[test]
fn interleaved_add_remove_never_over_admits() {
let region = RegionRecord::new(test_region_id(), None, Budget::default());
region.set_limits(RegionLimits {
max_tasks: Some(3),
..RegionLimits::unlimited()
});
for round in 0..10u32 {
let base = round * 3;
let a = TaskId::from_arena(ArenaIndex::new(base, 0));
let b = TaskId::from_arena(ArenaIndex::new(base + 1, 0));
let c = TaskId::from_arena(ArenaIndex::new(base + 2, 0));
assert!(region.add_task(a).is_ok());
assert!(region.add_task(b).is_ok());
assert!(region.add_task(c).is_ok());
let overflow = TaskId::from_arena(ArenaIndex::new(base + 3, 0));
assert!(region.add_task(overflow).is_err());
region.remove_task(a);
region.remove_task(b);
region.remove_task(c);
assert_eq!(region.task_ids().len(), 0);
}
}
#[test]
fn access_witness_available_while_open() {
let region = RegionRecord::new(test_region_id(), None, Budget::INFINITE);
let witness = region.access_witness();
assert!(witness.is_ok());
assert_eq!(witness.unwrap().region(), test_region_id());
}
#[test]
fn access_witness_available_through_closing_phases() {
let region = RegionRecord::new(test_region_id(), None, Budget::INFINITE);
assert!(region.access_witness().is_ok());
region.begin_close(None);
assert!(region.access_witness().is_ok());
region.begin_drain();
assert!(region.access_witness().is_ok());
region.begin_finalize();
assert!(region.access_witness().is_ok());
}
#[test]
fn access_witness_denied_after_close() {
let region = RegionRecord::new(test_region_id(), None, Budget::INFINITE);
region.begin_close(None);
region.begin_drain();
region.begin_finalize();
region.complete_close();
let witness = region.access_witness();
assert!(witness.is_err());
assert_eq!(witness.unwrap_err(), RRefError::RegionClosed);
}
#[test]
fn witness_gated_get_succeeds_with_matching_region() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region.heap_alloc(42u32).expect("heap alloc");
let rref = RRef::<u32>::new(rid, index);
let witness = region.access_witness().expect("witness");
let value = region.rref_get_with(&rref, witness).expect("get_with");
assert_eq!(value, 42);
}
#[test]
fn witness_gated_with_succeeds_with_matching_region() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region.heap_alloc("hello".to_string()).expect("heap alloc");
let rref = RRef::<String>::new(rid, index);
let witness = region.access_witness().expect("witness");
let len = region
.rref_with_witness(&rref, witness, String::len)
.expect("with_witness");
assert_eq!(len, 5);
}
#[test]
fn witness_from_wrong_region_rejected() {
let rid_a = test_region_id();
let rid_b = RegionId::from_arena(ArenaIndex::new(99, 0));
let region_a = RegionRecord::new(rid_a, None, Budget::INFINITE);
let region_b = RegionRecord::new(rid_b, None, Budget::INFINITE);
let index = region_a.heap_alloc(7u32).expect("heap alloc");
let rref = RRef::<u32>::new(rid_a, index);
let wrong_witness = region_b.access_witness().expect("witness");
let err = region_a.rref_get_with(&rref, wrong_witness);
assert_eq!(err.unwrap_err(), RRefError::WrongRegion);
}
#[test]
fn witness_rref_region_mismatch_rejected() {
let rid_a = test_region_id();
let rid_b = RegionId::from_arena(ArenaIndex::new(99, 0));
let region_a = RegionRecord::new(rid_a, None, Budget::INFINITE);
let index = region_a.heap_alloc(7u32).expect("heap alloc");
let rref = RRef::<u32>::new(rid_b, index);
let witness = region_a.access_witness().expect("witness");
let err = region_a.rref_get_with(&rref, witness);
assert_eq!(err.unwrap_err(), RRefError::WrongRegion);
}
#[test]
fn stale_witness_rejected_after_close() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region.heap_alloc(42u32).expect("heap alloc");
let rref = RRef::<u32>::new(rid, index);
let witness = region.access_witness().expect("witness");
region.begin_close(None);
region.begin_drain();
region.begin_finalize();
region.complete_close();
let err = region.rref_get_with(&rref, witness);
assert_eq!(err.unwrap_err(), RRefError::RegionClosed);
}
#[test]
fn rref_access_trait_get_works() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region.heap_alloc(99i64).expect("heap alloc");
let rref = RRef::<i64>::new(rid, index);
let value = rref_get_via_trait(®ion, &rref);
assert_eq!(value, 99);
}
#[test]
fn rref_access_trait_with_works() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region.heap_alloc(vec![1, 2, 3]).expect("heap alloc");
let rref = RRef::<Vec<i32>>::new(rid, index);
let len = rref_with_via_trait(®ion, &rref, Vec::len);
assert_eq!(len, 3);
}
#[test]
fn rref_access_trait_witness_methods_work() {
let rid = test_region_id();
let region = RegionRecord::new(rid, None, Budget::INFINITE);
let index = region
.heap_alloc("witness".to_string())
.expect("heap alloc");
let rref = RRef::<String>::new(rid, index);
let witness = region.access_witness().expect("witness");
let value = rref_get_with_via_trait(®ion, &rref, witness);
assert_eq!(value, "witness");
let len = rref_with_witness_via_trait(®ion, &rref, witness, String::len);
assert_eq!(len, 7);
}
#[test]
fn admission_kind_debug_clone_copy_eq() {
let k = AdmissionKind::Task;
let dbg = format!("{k:?}");
assert!(dbg.contains("Task"), "{dbg}");
let copied: AdmissionKind = k;
let cloned = k;
assert_eq!(copied, cloned);
assert_ne!(k, AdmissionKind::Child);
}
#[test]
fn admission_error_debug_clone_copy_eq() {
let e = AdmissionError::Closed;
let dbg = format!("{e:?}");
assert!(dbg.contains("Closed"), "{dbg}");
let copied: AdmissionError = e;
let cloned = e;
assert_eq!(copied, cloned);
let e2 = AdmissionError::LimitReached {
kind: AdmissionKind::HeapBytes,
limit: 1024,
live: 1024,
};
let dbg2 = format!("{e2:?}");
assert!(dbg2.contains("LimitReached"), "{dbg2}");
assert_ne!(e, e2);
}
#[test]
fn region_limits_debug_clone_default_eq() {
let l = RegionLimits::default();
assert_eq!(l, RegionLimits::UNLIMITED);
let dbg = format!("{l:?}");
assert!(dbg.contains("RegionLimits"), "{dbg}");
let cloned = l.clone();
assert_eq!(l, cloned);
}
#[test]
fn obligation_bounded_by_region_limit() {
crate::test_utils::init_test_logging();
crate::test_phase!("obligation_bounded_by_region_limit");
let region = RegionRecord::new(test_region_id(), None, Budget::default());
let bound: usize = 5;
region.set_limits(RegionLimits {
max_obligations: Some(bound),
..RegionLimits::unlimited()
});
for i in 0..bound {
assert!(
region.try_reserve_obligation().is_ok(),
"reserve {i} should succeed within bound {bound}"
);
}
assert_eq!(region.pending_obligations(), bound);
let err = region
.try_reserve_obligation()
.expect_err("expected rejection at bound");
match err {
AdmissionError::LimitReached { kind, limit, live } => {
assert_eq!(kind, AdmissionKind::Obligation);
assert_eq!(limit, bound);
assert_eq!(live, bound);
}
AdmissionError::Closed => unreachable!("expected LimitReached, got Closed"),
}
region.resolve_obligation();
assert_eq!(region.pending_obligations(), bound - 1);
assert!(
region.try_reserve_obligation().is_ok(),
"should succeed after resolving one"
);
assert_eq!(region.pending_obligations(), bound);
}
}