use super::region_table::RegionCreateError;
use crate::cancel::protocol_state_machines::{
CancelProtocolValidator, ObligationContext, ObligationEvent, RegionContext, RegionEvent,
TaskContext, TaskEvent, TransitionResult, ValidationLevel as CancelValidationLevel,
};
use crate::cx::cx::ObservabilityState;
use crate::cx::scope::{CatchUnwind, payload_to_string};
use crate::epoch::EpochId;
use crate::error::{Error, ErrorKind};
use crate::observability::metrics::{MetricsProvider, NoOpMetrics, OutcomeKind};
use crate::observability::{LogCollector, ObservabilityConfig};
use crate::record::{
AdmissionError, ObligationAbortReason, ObligationKind, ObligationRecord, ObligationState,
RegionLimits, RegionRecord, SourceLocation, TaskRecord,
finalizer::{FINALIZER_TIME_BUDGET_NANOS, Finalizer, finalizer_budget},
region::RegionState,
task::TaskState,
};
use crate::runtime::config::{LeakEscalation, ObligationLeakResponse};
use crate::runtime::io_driver::{IoDriver, IoDriverHandle};
use crate::runtime::reactor::Reactor;
use crate::runtime::resource_monitor::{
DegradationLevel, DegradationStatsSnapshot, MonitorConfig, RegionPriority, ResourceMonitor,
};
use crate::runtime::stored_task::StoredTask;
use crate::runtime::task_handle::JoinError;
use crate::runtime::{BlockingPoolHandle, ObligationTable, RegionTable, TaskTable};
use crate::time::TimerDriverHandle;
use crate::trace::distributed::{LogicalClockMode, LogicalTime};
use crate::trace::event::{TraceData, TraceEventKind};
use crate::trace::{TraceBufferHandle, TraceEvent};
use crate::tracing_compat::{debug, debug_span, trace, trace_span};
use crate::types::policy::PolicyAction;
use crate::types::task_context::{CxInner, MAX_MASK_DEPTH};
use crate::types::{
Budget, CancelAttributionConfig, CancelKind, CancelReason, ObligationId, Outcome, Policy,
RegionId, TaskId, Time,
};
use crate::util::{Arena, ArenaIndex, EntropySource, OsEntropy};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::backtrace::Backtrace;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::Poll;
use std::time::Duration;
static NEXT_RUNTIME_INSTANCE_ID: AtomicU64 = AtomicU64::new(1);
type BoxedAsyncFinalizer = std::pin::Pin<Box<dyn Future<Output = ()> + Send>>;
fn log_cancel_protocol_violation(operation: &'static str, validation_result: &TransitionResult) {
let _ = operation;
let _ = validation_result;
crate::tracing_compat::error!(
operation,
validation_result = ?validation_result,
"cancel protocol violation"
);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FinalizerHistoryEvent {
Registered {
id: u64,
region: RegionId,
time: Time,
},
Ran {
id: u64,
time: Time,
},
RegionClosed {
region: RegionId,
time: Time,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SpawnError {
RuntimeUnavailable,
RegionNotFound(RegionId),
RegionClosed(RegionId),
LocalSchedulerUnavailable,
NameRegistrationFailed {
name: String,
reason: String,
},
RegionAtCapacity {
region: RegionId,
limit: usize,
live: usize,
},
}
impl std::fmt::Display for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RuntimeUnavailable => write!(f, "runtime is no longer available"),
Self::RegionNotFound(id) => write!(f, "region not found: {id:?}"),
Self::RegionClosed(id) => write!(f, "region closed: {id:?}"),
Self::LocalSchedulerUnavailable => {
write!(f, "local spawn requires an active worker scheduler")
}
Self::NameRegistrationFailed { name, reason } => {
write!(f, "name registration failed: name={name} reason={reason}")
}
Self::RegionAtCapacity {
region,
limit,
live,
} => write!(
f,
"region admission limit reached: region={region:?} limit={limit} live={live}"
),
}
}
}
impl std::error::Error for SpawnError {}
#[derive(Debug, Clone, Copy)]
enum TaskCompletionKind {
Ok,
Err,
Cancelled,
Panicked,
Unknown,
}
impl TaskCompletionKind {
fn from_state(state: &TaskState) -> Self {
match state {
TaskState::Completed(Outcome::Ok(())) => Self::Ok,
TaskState::Completed(Outcome::Err(_)) => Self::Err,
TaskState::Completed(Outcome::Cancelled(_)) => Self::Cancelled,
TaskState::Completed(Outcome::Panicked(_)) => Self::Panicked,
_ => Self::Unknown,
}
}
const fn as_str(self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Err => "err",
Self::Cancelled => "cancelled",
Self::Panicked => "panicked",
Self::Unknown => "unknown",
}
}
}
struct MaskedFinalizer {
inner: BoxedAsyncFinalizer,
cx_inner: Arc<parking_lot::RwLock<CxInner>>,
entered: bool,
}
impl MaskedFinalizer {
fn new(inner: BoxedAsyncFinalizer, cx_inner: Arc<parking_lot::RwLock<CxInner>>) -> Self {
Self {
inner,
cx_inner,
entered: false,
}
}
fn enter_mask(&mut self) {
if self.entered {
return;
}
let mut guard = self.cx_inner.write();
debug_assert!(
guard.mask_depth < MAX_MASK_DEPTH,
"mask depth exceeded MAX_MASK_DEPTH ({MAX_MASK_DEPTH}): this violates INV-MASK-BOUNDED \
and prevents cancellation from ever being observed. \
Reduce nesting of masked sections.",
);
if guard.mask_depth >= MAX_MASK_DEPTH {
crate::tracing_compat::error!(
depth = guard.mask_depth,
max = MAX_MASK_DEPTH,
"INV-MASK-BOUNDED violated: mask depth saturated, cancellation may be unobservable"
);
return;
}
guard.mask_depth += 1;
drop(guard);
self.entered = true;
}
fn exit_mask(&mut self) {
if !self.entered {
return;
}
let mut guard = self.cx_inner.write();
guard.mask_depth = guard.mask_depth.saturating_sub(1);
drop(guard);
self.entered = false;
}
}
impl Future for MaskedFinalizer {
type Output = ();
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
self.enter_mask();
let poll = self.inner.as_mut().poll(cx);
if poll.is_ready() {
self.exit_mask();
}
poll
}
}
impl Drop for MaskedFinalizer {
fn drop(&mut self) {
self.exit_mask();
}
}
impl Unpin for MaskedFinalizer {}
#[derive(Debug, Clone)]
struct LeakedObligationInfo {
id: ObligationId,
kind: ObligationKind,
holder: TaskId,
region: RegionId,
acquired_at: SourceLocation,
held_duration_ns: u64,
description: Option<String>,
#[allow(dead_code)]
acquire_backtrace: Option<Arc<Backtrace>>,
}
impl fmt::Display for LeakedObligationInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:?} {:?} holder={:?} region={:?} acquired_at={} held_ns={}",
self.id, self.kind, self.holder, self.region, self.acquired_at, self.held_duration_ns
)?;
if let Some(desc) = &self.description {
write!(f, " desc={desc}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct ObligationLeakError {
task_id: Option<TaskId>,
region_id: RegionId,
completion: Option<TaskCompletionKind>,
leaks: Vec<LeakedObligationInfo>,
}
impl fmt::Display for ObligationLeakError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let completion = self
.completion
.map_or("unknown", TaskCompletionKind::as_str);
write!(
f,
"obligation leak: task={:?} region={:?} completion={} leaked={}",
self.task_id,
self.region_id,
completion,
self.leaks.len()
)?;
for leak in &self.leaks {
write!(f, "\n - {leak}")?;
}
Ok(())
}
}
#[derive(Debug, Clone, Copy)]
struct CancelRegionNode {
id: RegionId,
parent: Option<RegionId>,
depth: usize,
}
#[derive(Debug, Clone)]
struct RuntimeObservability {
config: ObservabilityConfig,
collector: LogCollector,
}
impl RuntimeObservability {
fn new(config: ObservabilityConfig) -> Self {
let collector = config.create_collector();
Self { config, collector }
}
fn for_task(&self, region: RegionId, task: TaskId) -> ObservabilityState {
ObservabilityState::new_with_config(
region,
task,
&self.config,
Some(self.collector.clone()),
)
}
}
pub struct RuntimeState {
instance_id: u64,
pub regions: RegionTable,
pub tasks: TaskTable,
pub obligations: ObligationTable,
pub now: Time,
pub root_region: Option<RegionId>,
pub trace: TraceBufferHandle,
pub metrics: Arc<dyn MetricsProvider>,
io_driver: Option<IoDriverHandle>,
timer_driver: Option<TimerDriverHandle>,
logical_clock_mode: LogicalClockMode,
cancel_attribution: CancelAttributionConfig,
entropy_source: Arc<dyn EntropySource>,
observability: Option<RuntimeObservability>,
blocking_pool: Option<BlockingPoolHandle>,
obligation_leak_response: ObligationLeakResponse,
leak_escalation: Option<LeakEscalation>,
leak_count: u64,
handling_leaks: usize,
in_flight_leak_ids: HashSet<ObligationId>,
finalizing_regions: Vec<RegionId>,
recently_closed_regions: HashSet<RegionId>,
recently_closed_region_order: VecDeque<RegionId>,
pending_finalizer_ids: HashMap<RegionId, Vec<u64>>,
async_finalizer_tasks: HashMap<TaskId, u64>,
active_async_finalizers: HashMap<RegionId, TaskId>,
finalizer_history: Vec<FinalizerHistoryEvent>,
next_finalizer_id: u64,
region_table_epoch: EpochId,
task_table_epoch: EpochId,
obligation_table_epoch: EpochId,
epoch_tracker: super::epoch_tracker::EpochConsistencyTracker,
state_verifier: Arc<super::state_verifier::StateTransitionVerifier>,
cancel_protocol_validator: Arc<parking_lot::Mutex<CancelProtocolValidator>>,
debt_monitor: Arc<crate::observability::CancellationDebtMonitor>,
resource_monitor: Arc<ResourceMonitor>,
}
impl std::fmt::Debug for RuntimeState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeState")
.field("regions", &self.regions)
.field("tasks", &self.tasks)
.field("obligations", &self.obligations)
.field("now", &self.now)
.field("instance_id", &self.instance_id)
.field("root_region", &self.root_region)
.field("trace", &self.trace)
.field("metrics", &"<dyn MetricsProvider>")
.field("io_driver", &self.io_driver)
.field("timer_driver", &self.timer_driver)
.field("logical_clock_mode", &self.logical_clock_mode)
.field("cancel_attribution", &self.cancel_attribution)
.field("entropy_source", &"<dyn EntropySource>")
.field("observability", &self.observability.is_some())
.field("blocking_pool", &self.blocking_pool.is_some())
.field("obligation_leak_response", &self.obligation_leak_response)
.field("leak_escalation", &self.leak_escalation)
.field("leak_count", &self.leak_count)
.field("handling_leaks", &self.handling_leaks)
.field("in_flight_leak_ids", &self.in_flight_leak_ids.len())
.field("finalizing_region_count", &self.finalizing_regions.len())
.field(
"recently_closed_region_count",
&self.recently_closed_regions.len(),
)
.field(
"recently_closed_region_order_count",
&self.recently_closed_region_order.len(),
)
.field(
"pending_finalizer_regions",
&self.pending_finalizer_ids.len(),
)
.field("async_finalizer_tasks", &self.async_finalizer_tasks.len())
.field(
"active_async_finalizers",
&self.active_async_finalizers.len(),
)
.field("finalizer_history_len", &self.finalizer_history.len())
.field("next_finalizer_id", &self.next_finalizer_id)
.field("region_table_epoch", &self.region_table_epoch)
.field("task_table_epoch", &self.task_table_epoch)
.field("obligation_table_epoch", &self.obligation_table_epoch)
.field("state_verifier", &"<StateTransitionVerifier>")
.field("cancel_protocol_validator", &"<CancelProtocolValidator>")
.field("debt_monitor", &"<CancellationDebtMonitor>")
.finish()
}
}
impl RuntimeState {
const RECENTLY_CLOSED_REGION_CAPACITY: usize = 4096;
#[must_use]
pub fn new() -> Self {
Self::new_with_metrics(Arc::new(NoOpMetrics))
}
#[must_use]
pub fn new_with_metrics(metrics: Arc<dyn MetricsProvider>) -> Self {
Self {
instance_id: NEXT_RUNTIME_INSTANCE_ID.fetch_add(1, Ordering::Relaxed),
regions: RegionTable::new(),
tasks: TaskTable::new(),
obligations: ObligationTable::new(),
now: Time::ZERO,
root_region: None,
trace: TraceBufferHandle::new(4096),
metrics,
io_driver: None,
timer_driver: None,
logical_clock_mode: LogicalClockMode::Lamport,
cancel_attribution: CancelAttributionConfig::default(),
entropy_source: Arc::new(OsEntropy),
observability: None,
blocking_pool: None,
obligation_leak_response: ObligationLeakResponse::Log,
leak_escalation: None,
leak_count: 0,
handling_leaks: 0,
in_flight_leak_ids: HashSet::new(),
finalizing_regions: Vec::new(),
recently_closed_regions: HashSet::new(),
recently_closed_region_order: VecDeque::new(),
pending_finalizer_ids: HashMap::new(),
async_finalizer_tasks: HashMap::new(),
active_async_finalizers: HashMap::new(),
finalizer_history: Vec::new(),
next_finalizer_id: 0,
region_table_epoch: EpochId::GENESIS,
task_table_epoch: EpochId::GENESIS,
obligation_table_epoch: EpochId::GENESIS,
epoch_tracker: super::epoch_tracker::EpochConsistencyTracker::new(),
state_verifier: Arc::new(super::state_verifier::StateTransitionVerifier::new(
super::state_verifier::StateVerifierConfig::default(),
)),
cancel_protocol_validator: Arc::new(parking_lot::Mutex::new(
CancelProtocolValidator::new(CancelValidationLevel::Basic),
)),
debt_monitor: Arc::new(crate::observability::CancellationDebtMonitor::default()),
resource_monitor: Arc::new(ResourceMonitor::new(MonitorConfig::default())),
}
}
#[must_use]
pub fn with_reactor_and_metrics(
reactor: Arc<dyn Reactor>,
metrics: Arc<dyn MetricsProvider>,
) -> Self {
let mut state = Self::new_with_metrics(metrics);
state.io_driver = Some(IoDriverHandle::new(reactor));
state.timer_driver = Some(TimerDriverHandle::with_wall_clock());
state.logical_clock_mode = LogicalClockMode::Hybrid;
state
}
#[must_use]
pub fn with_reactor(reactor: Arc<dyn Reactor>) -> Self {
Self::with_reactor_and_metrics(reactor, Arc::new(NoOpMetrics))
}
#[must_use]
pub fn without_reactor() -> Self {
Self::new()
}
#[inline]
#[must_use]
pub fn io_driver(&self) -> Option<&IoDriverHandle> {
self.io_driver.as_ref()
}
pub fn io_driver_mut(&self) -> Option<parking_lot::MutexGuard<'_, IoDriver>> {
self.io_driver.as_ref().map(IoDriverHandle::lock)
}
#[inline]
#[must_use]
pub fn io_driver_handle(&self) -> Option<IoDriverHandle> {
self.io_driver.clone()
}
pub fn set_io_driver(&mut self, driver: IoDriverHandle) {
self.io_driver = Some(driver);
}
#[inline]
#[must_use]
pub fn timer_driver(&self) -> Option<&TimerDriverHandle> {
self.timer_driver.as_ref()
}
#[inline]
#[must_use]
pub fn timer_driver_handle(&self) -> Option<TimerDriverHandle> {
self.timer_driver.clone()
}
#[inline]
fn current_runtime_time(&self) -> Time {
self.timer_driver
.as_ref()
.map_or(self.now, TimerDriverHandle::now)
}
#[inline]
#[must_use]
pub fn blocking_pool_handle(&self) -> Option<BlockingPoolHandle> {
self.blocking_pool.clone()
}
#[inline]
#[must_use]
pub fn state_verifier(&self) -> &Arc<super::state_verifier::StateTransitionVerifier> {
&self.state_verifier
}
#[must_use]
pub fn state_verifier_stats(&self) -> super::state_verifier::StateVerifierStatsSnapshot {
self.state_verifier.stats()
}
#[inline]
#[must_use]
pub fn cancel_protocol_validator(&self) -> &Arc<parking_lot::Mutex<CancelProtocolValidator>> {
&self.cancel_protocol_validator
}
fn validate_region_protocol_transition(
&self,
region_id: RegionId,
event: RegionEvent,
context: &RegionContext,
) -> TransitionResult {
let mut validator = self.cancel_protocol_validator.lock();
validator.validate_region_transition(region_id, event, context)
}
fn validate_task_protocol_transition(
&self,
task_id: TaskId,
event: TaskEvent,
context: &TaskContext,
) -> TransitionResult {
let mut validator = self.cancel_protocol_validator.lock();
validator.validate_task_transition(task_id, event, context)
}
fn validate_obligation_protocol_transition(
&self,
obligation_id: ObligationId,
event: ObligationEvent,
context: &ObligationContext,
) -> TransitionResult {
let mut validator = self.cancel_protocol_validator.lock();
validator.validate_obligation_transition(obligation_id, event, context)
}
pub fn set_blocking_pool(&mut self, handle: BlockingPoolHandle) {
self.blocking_pool = Some(handle);
}
pub fn set_timer_driver(&mut self, driver: TimerDriverHandle) {
self.timer_driver = Some(driver);
}
#[must_use]
pub fn logical_clock_mode(&self) -> &LogicalClockMode {
&self.logical_clock_mode
}
pub fn set_logical_clock_mode(&mut self, mode: LogicalClockMode) {
self.logical_clock_mode = mode;
}
#[must_use]
pub fn cancel_attribution_config(&self) -> CancelAttributionConfig {
self.cancel_attribution
}
pub fn set_cancel_attribution_config(&mut self, config: CancelAttributionConfig) {
self.cancel_attribution = config;
}
#[inline]
#[must_use]
pub fn entropy_source(&self) -> Arc<dyn EntropySource> {
self.entropy_source.clone()
}
pub fn set_entropy_source(&mut self, source: Arc<dyn EntropySource>) {
self.entropy_source = source;
}
pub fn set_observability_config(&mut self, config: ObservabilityConfig) {
self.observability = Some(RuntimeObservability::new(config));
}
pub fn clear_observability_config(&mut self) {
self.observability = None;
}
#[must_use]
pub(crate) fn observability_for_task(
&self,
region: RegionId,
task: TaskId,
) -> Option<ObservabilityState> {
self.observability
.as_ref()
.map(|obs| obs.for_task(region, task))
}
pub fn set_obligation_leak_response(&mut self, response: ObligationLeakResponse) {
self.obligation_leak_response = response;
}
pub fn set_leak_escalation(&mut self, escalation: Option<LeakEscalation>) {
self.leak_escalation = escalation;
}
#[must_use]
pub fn leak_count(&self) -> u64 {
self.leak_count
}
#[inline]
#[must_use]
pub fn trace_handle(&self) -> TraceBufferHandle {
self.trace.clone()
}
#[inline]
#[must_use]
pub fn instance_id(&self) -> u64 {
self.instance_id
}
#[inline]
#[must_use]
pub fn metrics_provider(&self) -> Arc<dyn MetricsProvider> {
self.metrics.clone()
}
pub fn set_metrics_provider(&mut self, provider: Arc<dyn MetricsProvider>) {
self.metrics = provider;
}
#[inline]
#[must_use]
pub fn debt_monitor(&self) -> Arc<crate::observability::CancellationDebtMonitor> {
self.debt_monitor.clone()
}
#[inline]
#[must_use]
pub fn task(&self, task_id: TaskId) -> Option<&TaskRecord> {
self.tasks.task(task_id)
}
#[inline]
pub fn task_mut(&mut self, task_id: TaskId) -> Option<&mut TaskRecord> {
self.tasks.task_mut(task_id)
}
#[inline]
pub fn insert_task(&mut self, record: TaskRecord) -> ArenaIndex {
self.tasks.insert_task(record)
}
#[inline]
pub fn insert_task_with<F>(&mut self, f: F) -> ArenaIndex
where
F: FnOnce(ArenaIndex) -> TaskRecord,
{
self.tasks.insert_task_with(f)
}
#[inline]
pub fn remove_task(&mut self, task_id: TaskId) -> Option<TaskRecord> {
let removed = self.tasks.remove_task(task_id);
if removed.is_some() {
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::TaskTable);
}
removed
}
pub fn tasks_iter(&self) -> impl Iterator<Item = (ArenaIndex, &TaskRecord)> {
self.tasks.tasks_arena().iter()
}
#[must_use]
pub fn tasks_is_empty(&self) -> bool {
self.tasks.tasks_arena().is_empty()
}
#[inline]
#[must_use]
pub fn tasks_arena(&self) -> &Arena<TaskRecord> {
self.tasks.tasks_arena()
}
#[inline]
pub fn tasks_arena_mut(&mut self) -> &mut Arena<TaskRecord> {
self.tasks.tasks_arena_mut()
}
#[inline]
#[must_use]
pub fn region(&self, region_id: RegionId) -> Option<&RegionRecord> {
self.regions.get(region_id.arena_index())
}
#[inline]
#[must_use]
pub fn region_was_closed(&self, region_id: RegionId) -> bool {
self.recently_closed_regions.contains(®ion_id)
}
#[inline]
pub fn region_mut(&mut self, region_id: RegionId) -> Option<&mut RegionRecord> {
self.regions.get_mut(region_id.arena_index())
}
pub fn regions_iter(&self) -> impl Iterator<Item = (ArenaIndex, &RegionRecord)> {
self.regions.iter()
}
#[must_use]
pub fn regions_len(&self) -> usize {
self.regions.len()
}
#[must_use]
pub fn regions_is_empty(&self) -> bool {
self.regions.is_empty()
}
#[must_use]
pub fn obligation(&self, obligation_id: ObligationId) -> Option<&ObligationRecord> {
self.obligations.get(obligation_id.arena_index())
}
#[inline]
pub fn obligation_mut(&mut self, obligation_id: ObligationId) -> Option<&mut ObligationRecord> {
self.obligations.get_mut(obligation_id.arena_index())
}
pub fn obligations_iter(&self) -> impl Iterator<Item = (ArenaIndex, &ObligationRecord)> {
self.obligations.iter()
}
#[must_use]
pub fn obligations_len(&self) -> usize {
self.obligations.len()
}
#[must_use]
pub fn obligations_is_empty(&self) -> bool {
self.obligations.is_empty()
}
#[inline]
#[must_use]
pub fn has_io_driver(&self) -> bool {
self.io_driver.is_some()
}
#[must_use]
pub fn snapshot(&self) -> RuntimeSnapshot {
let now = self.current_runtime_time();
let mut obligations_by_task: HashMap<TaskId, Vec<ObligationId>> =
HashMap::with_capacity(self.obligations_len());
let obligations: Vec<ObligationSnapshot> = self
.obligations_iter()
.map(|(_, record)| {
obligations_by_task
.entry(record.holder)
.or_default()
.push(record.id);
ObligationSnapshot::from_record(record)
})
.collect();
let regions: Vec<RegionSnapshot> = self
.regions_iter()
.map(|(_, record)| RegionSnapshot::from_record(record))
.collect();
let tasks: Vec<TaskSnapshot> = self
.tasks_iter()
.map(|(_, record)| {
let task_obligations = obligations_by_task
.get(&record.id)
.cloned()
.unwrap_or_default();
TaskSnapshot::from_record(record, task_obligations)
})
.collect();
let recent_events: Vec<EventSnapshot> = self
.trace
.snapshot()
.iter()
.map(EventSnapshot::from_event)
.collect();
RuntimeSnapshot {
timestamp: now.as_nanos(),
regions,
tasks,
obligations,
recent_events,
}
}
pub fn create_root_region(&mut self, budget: Budget) -> RegionId {
debug_assert!(
self.root_region.is_none(),
"create_root_region called twice; previous root: {:?}",
self.root_region
);
let now = self.current_runtime_time();
let id = self.regions.create_root(budget, now);
{
let mut validator = self.cancel_protocol_validator.lock();
validator.register_region(id);
}
self.root_region = Some(id);
self.record_trace_event(|seq| TraceEvent::region_created(seq, now, id, None));
self.metrics.region_created(id, None);
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::RegionTable);
id
}
pub fn create_child_region(
&mut self,
parent: RegionId,
budget: Budget,
) -> Result<RegionId, RegionCreateError> {
self.check_resource_pressure_for_region(RegionPriority::Normal)?;
let now = self.current_runtime_time();
let id = self.regions.create_child(parent, budget, now)?;
self.record_trace_event(|seq| TraceEvent::region_created(seq, now, id, Some(parent)));
self.metrics.region_created(id, Some(parent));
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::RegionTable);
Ok(id)
}
pub fn set_region_limits(&mut self, region: RegionId, limits: RegionLimits) -> bool {
self.regions.set_limits(region, limits)
}
#[must_use]
pub fn region_limits(&self, region: RegionId) -> Option<RegionLimits> {
self.regions.limits(region)
}
#[allow(clippy::type_complexity)]
pub(crate) fn create_task_infrastructure<T>(
&mut self,
region: RegionId,
budget: Budget,
cleanup_task: bool,
) -> Result<
(
TaskId,
crate::runtime::TaskHandle<T>,
crate::cx::Cx,
crate::channel::oneshot::Sender<Result<T, crate::runtime::task_handle::JoinError>>,
),
SpawnError,
>
where
T: Send + 'static,
{
use crate::channel::oneshot;
let (result_tx, result_rx) =
oneshot::channel::<Result<T, crate::runtime::task_handle::JoinError>>();
let now = self.current_runtime_time();
let idx = self.tasks.insert_task_with(|idx| {
TaskRecord::new_with_time(TaskId::from_arena(idx), region, budget, now)
});
let task_id = TaskId::from_arena(idx);
{
let mut validator = self.cancel_protocol_validator.lock();
validator.register_task(task_id, region);
}
let context = TaskContext {
task_id,
region_id: region,
spawned_at: now,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_task_protocol_transition(
task_id,
TaskEvent::Start, &context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. } | TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation("task creation", &validation_result);
}
if let Some(region_record) = self.regions.get(region.arena_index()) {
let admission = if cleanup_task {
region_record.add_cleanup_task(task_id)
} else {
region_record.add_task(task_id)
};
if let Err(err) = admission {
let _ = self.remove_task(task_id);
return Err(match err {
AdmissionError::Closed => SpawnError::RegionClosed(region),
AdmissionError::LimitReached { limit, live, .. } => {
SpawnError::RegionAtCapacity {
region,
limit,
live,
}
}
});
}
} else {
let _ = self.remove_task(task_id);
return Err(SpawnError::RegionNotFound(region));
}
let entropy = self.entropy_source.fork(task_id);
let observability = self
.observability
.as_ref()
.map(|obs| obs.for_task(region, task_id));
let logical_clock = self
.logical_clock_mode
.build_handle(self.timer_driver_handle());
let cx = crate::cx::Cx::new_with_drivers(
region,
task_id,
budget,
observability,
self.io_driver_handle(),
None,
self.timer_driver_handle(),
Some(entropy),
)
.with_blocking_pool_handle(self.blocking_pool_handle())
.with_logical_clock(logical_clock);
cx.set_trace_buffer(self.trace_handle());
let cx_weak = std::sync::Arc::downgrade(&cx.inner);
if let Some(record) = self.task_mut(task_id) {
record.set_cx_inner(cx.inner.clone());
record.set_cx(cx.clone());
}
self.record_task_spawn(task_id, region);
debug!(
task_id = ?task_id,
region_id = ?region,
initial_state = "Created",
poll_quota = budget.poll_quota,
"task created via RuntimeState"
);
let handle = crate::runtime::TaskHandle::new(task_id, result_rx, cx_weak);
Ok((task_id, handle, cx, result_tx))
}
pub fn create_task<F, T>(
&mut self,
region: RegionId,
budget: Budget,
future: F,
) -> Result<(TaskId, crate::runtime::TaskHandle<T>), SpawnError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
use crate::runtime::task_handle::JoinError;
let (task_id, handle, cx, result_tx) =
self.create_task_infrastructure(region, budget, false)?;
let wrapped_future = async move {
match (CatchUnwind { inner: future }).await {
Ok(result) => {
let _ = result_tx.send(&cx, Ok::<_, JoinError>(result));
crate::types::Outcome::Ok(())
}
Err(payload) => {
let panic_payload =
crate::types::outcome::PanicPayload::new(payload_to_string(&payload));
let _ = result_tx.send(
&cx,
Err::<T, JoinError>(JoinError::Panicked(panic_payload.clone())),
);
crate::types::Outcome::Panicked(panic_payload)
}
}
};
self.tasks
.store_spawned_task(task_id, StoredTask::new_with_id(wrapped_future, task_id));
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::TaskTable);
Ok((task_id, handle))
}
fn logical_time_for_task(&self, task_id: TaskId) -> Option<LogicalTime> {
let record = self.task(task_id)?;
let cx = record.cx.as_ref()?;
Some(cx.logical_tick())
}
pub(crate) fn record_trace_event<F>(&self, build: F)
where
F: FnOnce(u64) -> TraceEvent,
{
self.trace.record_event(build);
}
pub(crate) fn notify_runtime_epoch_advance(&mut self, module: super::epoch_tracker::ModuleId) {
let now = self.current_runtime_time();
let cursor = match module {
super::epoch_tracker::ModuleId::RegionTable => &mut self.region_table_epoch,
super::epoch_tracker::ModuleId::TaskTable => &mut self.task_table_epoch,
super::epoch_tracker::ModuleId::ObligationTable => &mut self.obligation_table_epoch,
_ => return,
};
let from_epoch = *cursor;
let to_epoch = from_epoch.next();
*cursor = to_epoch;
self.epoch_tracker
.notify_epoch_transition(module, from_epoch, to_epoch, now);
}
fn record_task_trace_event<F>(&self, task_id: TaskId, build: F)
where
F: FnOnce(u64) -> TraceEvent,
{
let logical_time = self.logical_time_for_task(task_id);
self.trace.record_event(move |seq| {
let event = build(seq);
if let Some(logical_time) = logical_time {
event.with_logical_time(logical_time)
} else {
event
}
});
}
pub(crate) fn record_task_spawn(&self, task_id: TaskId, region: RegionId) {
let now = self.current_runtime_time();
self.record_task_trace_event(task_id, |seq| TraceEvent::spawn(seq, now, task_id, region));
self.metrics.task_spawned(region, task_id);
}
fn record_task_complete(&self, task: &TaskRecord) {
let now = self.current_runtime_time();
self.record_task_trace_event(task.id, |seq| {
TraceEvent::complete(seq, now, task.id, task.owner)
});
let duration = Duration::from_nanos(now.duration_since(task.created_at()));
let outcome_kind = match &task.state {
TaskState::Completed(outcome) => OutcomeKind::from(outcome),
_ => OutcomeKind::Err,
};
self.metrics.task_completed(task.id, outcome_kind, duration);
}
fn capture_obligation_backtrace() -> Option<Arc<Backtrace>> {
if cfg!(debug_assertions) {
Some(Arc::new(Backtrace::capture()))
} else {
None
}
}
fn collect_obligation_leaks<F>(&self, mut predicate: F) -> Vec<LeakedObligationInfo>
where
F: FnMut(&ObligationRecord) -> bool,
{
let now = self.current_runtime_time();
self.obligations
.iter()
.filter_map(|(_, record)| {
if !record.is_pending() || !predicate(record) {
return None;
}
let held_duration_ns = now.duration_since(record.reserved_at);
Some(LeakedObligationInfo {
id: record.id,
kind: record.kind,
holder: record.holder,
region: record.region,
acquired_at: record.acquired_at,
held_duration_ns,
description: record.description.clone(),
acquire_backtrace: record.acquire_backtrace.clone(),
})
})
.collect()
}
fn collect_obligation_leaks_for_holder(&self, task_id: TaskId) -> Vec<LeakedObligationInfo> {
let now = self.current_runtime_time();
self.obligations
.ids_for_holder(task_id)
.iter()
.filter_map(|id| {
let record = self.obligations.get(id.arena_index())?;
if !record.is_pending() {
return None;
}
let held_duration_ns = now.duration_since(record.reserved_at);
Some(LeakedObligationInfo {
id: record.id,
kind: record.kind,
holder: record.holder,
region: record.region,
acquired_at: record.acquired_at,
held_duration_ns,
description: record.description.clone(),
acquire_backtrace: record.acquire_backtrace.clone(),
})
})
.collect()
}
#[allow(clippy::needless_pass_by_value)]
fn handle_obligation_leaks(&mut self, error: ObligationLeakError) {
if error.leaks.is_empty() {
return;
}
let new_leaks: Vec<LeakedObligationInfo> = error
.leaks
.iter()
.filter(|leak| {
self.obligations
.get(leak.id.arena_index())
.is_some_and(ObligationRecord::is_pending)
&& !self.in_flight_leak_ids.contains(&leak.id)
})
.cloned()
.collect();
if new_leaks.is_empty() {
return;
}
let leak_ids: Vec<ObligationId> = new_leaks.iter().map(|leak| leak.id).collect();
self.in_flight_leak_ids.extend(leak_ids.iter().copied());
self.handling_leaks = self.handling_leaks.saturating_add(1);
self.leak_count = self.leak_count.saturating_add(leak_ids.len() as u64);
let mut response = if let Some(ref esc) = self.leak_escalation {
if self.leak_count >= esc.threshold {
esc.escalate_to
} else {
self.obligation_leak_response
}
} else {
self.obligation_leak_response
};
if matches!(response, ObligationLeakResponse::Panic) && std::thread::panicking() {
crate::tracing_compat::error!(
task_id = ?error.task_id,
"obligation leaks detected during panic; downgrading Panic policy to Log to prevent double-panic abort"
);
response = ObligationLeakResponse::Log;
}
match response {
ObligationLeakResponse::Panic => {
for &id in &leak_ids {
let _ = self.mark_obligation_leaked(id);
}
let msg = error.to_string();
crate::tracing_compat::error!(
task_id = ?error.task_id,
region_id = ?error.region_id,
completion = %error
.completion
.map_or("unknown", TaskCompletionKind::as_str),
leak_count = leak_ids.len(),
cumulative_leaks = self.leak_count,
details = %error,
"obligation leaks detected (fail-fast)"
);
self.handling_leaks = self.handling_leaks.saturating_sub(1);
for id in leak_ids {
self.in_flight_leak_ids.remove(&id);
}
std::panic::panic_any(msg);
}
ObligationLeakResponse::Log => {
for &id in &leak_ids {
let _ = self.mark_obligation_leaked(id);
}
crate::tracing_compat::error!(
task_id = ?error.task_id,
region_id = ?error.region_id,
completion = %error
.completion
.map_or("unknown", TaskCompletionKind::as_str),
leak_count = leak_ids.len(),
cumulative_leaks = self.leak_count,
details = %error,
"obligation leaks detected"
);
}
ObligationLeakResponse::Silent => {
for &id in &leak_ids {
let _ = self.mark_obligation_leaked(id);
}
}
ObligationLeakResponse::Recover => {
for &id in &leak_ids {
let _ = self.abort_obligation(id, ObligationAbortReason::Error);
}
crate::tracing_compat::warn!(
task_id = ?error.task_id,
region_id = ?error.region_id,
completion = %error
.completion
.map_or("unknown", TaskCompletionKind::as_str),
leak_count = leak_ids.len(),
cumulative_leaks = self.leak_count,
details = %error,
"obligation leaks recovered via auto-abort"
);
}
}
self.handling_leaks = self.handling_leaks.saturating_sub(1);
for id in leak_ids {
self.in_flight_leak_ids.remove(&id);
}
}
#[allow(clippy::result_large_err)]
#[track_caller]
pub fn create_obligation(
&mut self,
kind: ObligationKind,
holder: TaskId,
region: RegionId,
description: Option<String>,
) -> Result<ObligationId, Error> {
{
let Some(region_record) = self.regions.get(region.arena_index()) else {
return Err(Error::new(ErrorKind::RegionClosed).with_message("region not found"));
};
let Some(task_record) = self.task(holder) else {
return Err(Error::new(ErrorKind::TaskNotOwned)
.with_message(format!("holder task {holder:?} not found")));
};
if task_record.owner != region {
return Err(Error::new(ErrorKind::TaskNotOwned).with_message(format!(
"holder task {holder:?} is owned by region {:?}, not {region:?}",
task_record.owner
)));
}
if let Err(err) = region_record.try_reserve_obligation() {
return Err(match err {
AdmissionError::Closed => {
Error::new(ErrorKind::RegionClosed).with_message("region closed")
}
AdmissionError::LimitReached { limit, live, .. } => {
Error::new(ErrorKind::AdmissionDenied).with_message(format!(
"region {region:?} obligation limit {limit} reached (live {live})"
))
}
});
}
}
let acquired_at = SourceLocation::from_panic_location(std::panic::Location::caller());
let acquire_backtrace = Self::capture_obligation_backtrace();
let now = self.current_runtime_time();
let obligation_id =
self.obligations
.create(super::obligation_table::ObligationCreateArgs {
kind,
holder,
region,
now,
description,
acquired_at,
acquire_backtrace,
});
{
let mut validator = self.cancel_protocol_validator.lock();
validator.register_obligation(obligation_id);
}
let context = ObligationContext {
obligation_id,
region_id: region,
created_at: now,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_obligation_protocol_transition(
obligation_id,
ObligationEvent::Reserve {
token: obligation_id.arena_index().index() as u64,
},
&context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. } | TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation("obligation creation", &validation_result);
}
let _guard = crate::tracing_compat::debug_span!(
"obligation_reserve",
obligation_id = ?obligation_id,
kind = ?kind,
holder_task = ?holder,
region_id = ?region
)
.entered();
crate::tracing_compat::debug!(
obligation_id = ?obligation_id,
kind = ?kind,
holder_task = ?holder,
region_id = ?region,
"obligation reserved"
);
self.record_task_trace_event(holder, |seq| {
TraceEvent::obligation_reserve(seq, now, obligation_id, holder, region, kind)
});
self.metrics.obligation_created(region);
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::ObligationTable);
Ok(obligation_id)
}
#[allow(clippy::result_large_err)]
pub fn commit_obligation(&mut self, obligation: ObligationId) -> Result<u64, Error> {
let now = self.current_runtime_time();
if let Some(record) = self.obligations.get(obligation.arena_index()) {
let context = ObligationContext {
obligation_id: obligation,
region_id: record.region,
created_at: record.reserved_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_obligation_protocol_transition(
obligation,
ObligationEvent::Commit,
&context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. } | TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation("obligation commit", &validation_result);
}
}
let info = self.obligations.commit(obligation, now)?;
let span = crate::tracing_compat::debug_span!(
"obligation_commit",
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration
);
let _span_guard = span.enter();
crate::tracing_compat::debug!(
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
"obligation committed"
);
self.record_task_trace_event(info.holder, |seq| {
TraceEvent::obligation_commit(
seq,
now,
info.id,
info.holder,
info.region,
info.kind,
info.duration,
)
});
self.metrics.obligation_discharged(info.region);
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::ObligationTable);
if let Some(region_record) = self.regions.get(info.region.arena_index()) {
region_record.resolve_obligation();
}
self.advance_region_state(info.region);
Ok(info.duration)
}
#[allow(clippy::result_large_err)]
pub fn abort_obligation(
&mut self,
obligation: ObligationId,
reason: ObligationAbortReason,
) -> Result<u64, Error> {
let now = self.current_runtime_time();
if let Some(record) = self.obligations.get(obligation.arena_index()) {
let context = ObligationContext {
obligation_id: obligation,
region_id: record.region,
created_at: record.reserved_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_obligation_protocol_transition(
obligation,
ObligationEvent::Abort {
reason: format!("{reason:?}"),
},
&context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. } | TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation("obligation abort", &validation_result);
}
}
let info = self.obligations.abort(obligation, now, reason)?;
let span = crate::tracing_compat::debug_span!(
"obligation_abort",
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
abort_reason = %info.reason
);
let _span_guard = span.enter();
crate::tracing_compat::debug!(
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
abort_reason = %info.reason,
"obligation aborted"
);
self.record_task_trace_event(info.holder, |seq| {
TraceEvent::obligation_abort(
seq,
now,
info.id,
info.holder,
info.region,
info.kind,
info.duration,
info.reason,
)
});
self.metrics.obligation_discharged(info.region);
let cancel_reason = CancelReason::new(CancelKind::User);
self.debt_monitor.queue_work(
crate::observability::WorkType::ObligationSettlement,
format!("obligation_{}_{}", info.id, info.holder),
1, 1, &cancel_reason,
CancelKind::Shutdown,
Vec::new(),
);
self.notify_runtime_epoch_advance(super::epoch_tracker::ModuleId::ObligationTable);
if let Some(region_record) = self.regions.get(info.region.arena_index()) {
region_record.resolve_obligation();
}
self.advance_region_state(info.region);
Ok(info.duration)
}
#[allow(clippy::result_large_err)]
pub fn mark_obligation_leaked(&mut self, obligation: ObligationId) -> Result<u64, Error> {
let now = self.current_runtime_time();
let info = self.obligations.mark_leaked(obligation, now)?;
self.record_task_trace_event(info.holder, |seq| {
TraceEvent::obligation_leak(
seq,
now,
info.id,
info.holder,
info.region,
info.kind,
info.duration,
)
});
self.metrics.obligation_leaked(info.region);
if self.obligation_leak_response != ObligationLeakResponse::Silent {
let span = crate::tracing_compat::error_span!(
"obligation_leak",
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
acquired_at = %info.acquired_at
);
let _span_guard = span.enter();
#[allow(clippy::single_match, unused_variables)]
match info.acquire_backtrace.as_ref() {
Some(backtrace) => {
crate::tracing_compat::error!(
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
acquired_at = %info.acquired_at,
acquire_backtrace = ?backtrace,
"obligation leaked"
);
}
None => {
crate::tracing_compat::error!(
obligation_id = ?info.id,
kind = ?info.kind,
holder_task = ?info.holder,
region_id = ?info.region,
duration_ns = info.duration,
acquired_at = %info.acquired_at,
"obligation leaked"
);
}
}
}
if let Some(region_record) = self.regions.get(info.region.arena_index()) {
region_record.resolve_obligation();
}
self.advance_region_state(info.region);
Ok(info.duration)
}
#[inline]
pub fn get_stored_future(&mut self, task_id: TaskId) -> Option<&mut StoredTask> {
self.tasks.get_stored_future(task_id)
}
#[inline]
pub fn remove_stored_future(&mut self, task_id: TaskId) -> Option<StoredTask> {
self.tasks.remove_stored_future(task_id)
}
#[inline]
pub fn store_spawned_task(&mut self, task_id: TaskId, stored: StoredTask) {
self.tasks.store_spawned_task(task_id, stored);
}
#[must_use]
pub fn live_task_count(&self) -> usize {
self.tasks_iter()
.filter(|(_, t)| !t.state.is_terminal())
.count()
}
#[must_use]
pub fn live_region_count(&self) -> usize {
self.regions_iter()
.filter(|(_, r)| !r.state().is_terminal())
.count()
}
#[inline]
#[must_use]
pub fn pending_obligation_count(&self) -> usize {
self.obligations.pending_count()
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
self.live_task_count() == 0
&& self.pending_obligation_count() == 0
&& self.io_driver.as_ref().is_none_or(IoDriverHandle::is_empty)
&& self.regions.iter().all(|(_, r)| r.finalizers_empty())
}
pub fn apply_policy_on_child_outcome<P: Policy<Error = crate::error::Error>>(
&mut self,
region: RegionId,
child: TaskId,
outcome: &Outcome<(), crate::error::Error>,
policy: &P,
) -> (PolicyAction, SmallVec<[(TaskId, u8); 4]>) {
let action = policy.on_child_outcome(child, outcome);
let tasks_to_schedule = if let PolicyAction::CancelSiblings(reason) = &action {
self.cancel_sibling_tasks(region, child, reason)
} else {
SmallVec::new()
};
(action, tasks_to_schedule)
}
fn cancel_sibling_tasks(
&mut self,
region: RegionId,
child: TaskId,
reason: &CancelReason,
) -> SmallVec<[(TaskId, u8); 4]> {
let Some(region_record) = self.regions.get(region.arena_index()) else {
return SmallVec::new();
};
let sibling_candidates = region_record.task_ids_small();
let mut tasks_to_cancel =
SmallVec::with_capacity(sibling_candidates.len().saturating_sub(1));
let now = self.current_runtime_time();
for &task_id in &sibling_candidates {
if task_id == child {
continue;
}
let budget = reason.cleanup_budget();
let (newly_cancelled, is_cancelling) = {
let Some(task_record) = self.task_mut(task_id) else {
continue;
};
let newly_cancelled =
task_record.request_cancel_with_budget(reason.clone(), budget);
let is_cancelling = task_record.state.is_cancelling();
(newly_cancelled, is_cancelling)
};
if newly_cancelled {
self.record_task_trace_event(task_id, |seq| {
TraceEvent::cancel_request(seq, now, task_id, region, reason.clone())
});
}
if newly_cancelled || is_cancelling {
tasks_to_cancel.push((task_id, budget.priority));
}
}
tasks_to_cancel
}
#[allow(clippy::too_many_lines)]
pub fn cancel_request(
&mut self,
region_id: RegionId,
reason: &CancelReason,
source_task: Option<TaskId>,
) -> Vec<(TaskId, u8)> {
let mut tasks_to_cancel = Vec::with_capacity(32);
let cleanup_budget = reason.cleanup_budget();
#[cfg(not(feature = "tracing-integration"))]
let _ = (source_task, cleanup_budget);
let root_span = debug_span!(
"cancel_request",
target_region = ?region_id,
cancel_kind = ?reason.kind,
cancel_message = ?reason.message,
cleanup_poll_quota = cleanup_budget.poll_quota,
cleanup_priority = cleanup_budget.priority,
source_task = ?source_task
);
let _root_guard = root_span.enter();
debug!(
target_region = ?region_id,
cancel_kind = ?reason.kind,
cancel_message = ?reason.message,
cleanup_poll_quota = cleanup_budget.poll_quota,
cleanup_priority = cleanup_budget.priority,
source_task = ?source_task,
"cancel request initiated"
);
let now = self.current_runtime_time();
let mut regions_to_cancel = self.collect_region_and_descendants_with_depth(region_id);
regions_to_cancel.sort_by_key(|node| node.depth);
let mut region_reasons: HashMap<RegionId, CancelReason> =
HashMap::with_capacity(regions_to_cancel.len());
for node in ®ions_to_cancel {
let rid = node.id;
let region_reason = if rid == region_id {
reason.clone()
} else if let Some(parent_id) = node.parent {
let parent_reason = region_reasons
.get(&parent_id)
.cloned()
.unwrap_or_else(|| reason.clone());
CancelReason::parent_cancelled()
.with_region(parent_id)
.with_timestamp(reason.timestamp)
.with_cause_limited(parent_reason, &self.cancel_attribution)
} else {
CancelReason::parent_cancelled()
.with_timestamp(reason.timestamp)
.with_cause_limited(reason.clone(), &self.cancel_attribution)
};
region_reasons.insert(rid, region_reason.clone());
self.record_trace_event(|seq| {
TraceEvent::region_cancelled(seq, now, rid, region_reason.clone())
});
self.metrics.cancellation_requested(rid, region_reason.kind);
if let Some(parent) = node.parent {
#[cfg(not(feature = "tracing-integration"))]
let _ = parent;
let span = trace_span!(
"cancel_propagate_region",
from_region = ?parent,
to_region = ?rid,
depth = node.depth,
cancel_kind = ?region_reason.kind,
chain_depth = region_reason.chain_depth()
);
span.follows_from(&root_span);
let _guard = span.enter();
trace!(
from_region = ?parent,
to_region = ?rid,
depth = node.depth,
cancel_kind = ?region_reason.kind,
chain_depth = region_reason.chain_depth(),
root_cause = ?region_reason.root_cause().kind,
"cancel propagated to region with cause chain"
);
} else {
trace!(
target_region = ?rid,
depth = node.depth,
cancel_kind = ?region_reason.kind,
"cancel target region"
);
}
if let Some(region) = self.regions.get(rid.arena_index()) {
if region.begin_close(Some(region_reason.clone())) {
self.record_trace_event(|seq| {
TraceEvent::new(
seq,
now,
TraceEventKind::RegionCloseBegin,
TraceData::Region {
region: rid,
parent: node.parent,
},
)
});
} else if region.state() != crate::record::region::RegionState::Closed {
region.strengthen_cancel_reason(region_reason);
}
}
}
let mut task_id_buf = Vec::new();
for node in ®ions_to_cancel {
let rid = node.id;
task_id_buf.clear();
if let Some(region) = self.regions.get(rid.arena_index()) {
region.copy_task_ids_into(&mut task_id_buf);
}
let task_reason = region_reasons
.get(&rid)
.cloned()
.unwrap_or_else(|| reason.clone());
for &task_id in &task_id_buf {
if let Some(task) = self.task_mut(task_id) {
let task_budget = task_reason.cleanup_budget();
let newly_cancelled =
task.request_cancel_with_budget(task_reason.clone(), task_budget);
let already_cancelling = task.state.is_cancelling();
let cancel_kind = task.cancel_reason().map(|r| r.kind);
#[cfg(not(feature = "tracing-integration"))]
let _ = cancel_kind;
if newly_cancelled {
self.record_task_trace_event(task_id, |seq| {
TraceEvent::cancel_request(seq, now, task_id, rid, task_reason.clone())
});
}
let span = trace_span!(
"cancel_propagate_task",
from_region = ?rid,
to_task = ?task_id,
depth = node.depth,
cancel_kind = ?cancel_kind,
chain_depth = task_reason.chain_depth()
);
span.follows_from(&root_span);
let _guard = span.enter();
trace!(
from_region = ?rid,
to_task = ?task_id,
depth = node.depth,
newly_cancelled,
already_cancelling,
cleanup_poll_quota = task_budget.poll_quota,
cleanup_priority = task_budget.priority,
chain_depth = task_reason.chain_depth(),
root_cause = ?task_reason.root_cause().kind,
"cancel propagated to task with cause chain"
);
if newly_cancelled {
tasks_to_cancel.push((task_id, task_budget.priority));
} else if already_cancelling {
tasks_to_cancel.push((task_id, task_budget.priority));
}
}
}
}
for node in ®ions_to_cancel {
let Some(region) = self.regions.get(node.id.arena_index()) else {
continue;
};
let no_children = region.child_count() == 0;
let no_tasks = region.task_count() == 0;
if no_children && no_tasks {
self.advance_region_state(node.id);
}
}
tasks_to_cancel
}
fn collect_region_and_descendants_with_depth(
&self,
region_id: RegionId,
) -> Vec<CancelRegionNode> {
let mut result = Vec::new();
let mut stack = Vec::new();
let mut child_buf = Vec::new();
stack.push((region_id, None, 0usize));
while let Some((rid, parent, depth)) = stack.pop() {
result.push(CancelRegionNode {
id: rid,
parent,
depth,
});
if let Some(region) = self.regions.get(rid.arena_index()) {
child_buf.clear();
region.copy_child_ids_into(&mut child_buf);
for &child_id in &child_buf {
stack.push((child_id, Some(rid), depth + 1));
}
}
}
result
}
#[must_use]
pub fn can_region_finalize(&self, region_id: RegionId) -> bool {
let Some(region) = self.regions.get(region_id.arena_index()) else {
return false;
};
let all_tasks_done = region
.task_ids()
.iter()
.all(|&task_id| self.task(task_id).is_none_or(|t| t.state.is_terminal()));
let all_children_closed = region.child_ids().iter().all(|&child_id| {
self.regions
.get(child_id.arena_index())
.is_none_or(|r| r.state().is_terminal())
});
all_tasks_done && all_children_closed
}
pub fn task_completed(&mut self, task_id: TaskId) -> SmallVec<[TaskId; 4]> {
let (owner, completion, outcome_kind) = {
let Some(task) = self.task(task_id) else {
trace!(
task_id = ?task_id,
"task_completed called for unknown task"
);
return SmallVec::new();
};
let context = TaskContext {
task_id,
region_id: task.owner,
spawned_at: task.created_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result =
self.validate_task_protocol_transition(task_id, TaskEvent::Complete, &context);
if matches!(
validation_result,
TransitionResult::Invalid { .. } | TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation("task completion", &validation_result);
}
if let Some(inner) = task.cx_inner.as_ref() {
if inner.read().cancel_waker.is_some() {
inner.write().cancel_waker = None;
}
}
self.record_task_complete(task);
let outcome_kind = match &task.state {
crate::record::task::TaskState::Completed(outcome) => match outcome {
Outcome::Ok(()) => "Ok",
Outcome::Err(_) => "Err",
Outcome::Cancelled(_) => "Cancelled",
Outcome::Panicked(_) => "Panicked",
},
_ => "Unknown",
};
let owner = task.owner;
let completion = TaskCompletionKind::from_state(&task.state);
(owner, completion, outcome_kind)
};
let waiters = self
.task_mut(task_id)
.map(|task| std::mem::take(&mut task.waiters))
.unwrap_or_default();
let waiter_count = waiters.len();
#[cfg(not(feature = "tracing-integration"))]
let _ = (outcome_kind, waiter_count);
if !matches!(completion, TaskCompletionKind::Cancelled) {
let leaks = self.collect_obligation_leaks_for_holder(task_id);
if !leaks.is_empty() {
self.handle_obligation_leaks(ObligationLeakError {
task_id: Some(task_id),
region_id: owner,
completion: Some(completion),
leaks,
});
}
}
if let Some(finalizer_id) = self.async_finalizer_tasks.remove(&task_id) {
let should_clear_barrier = self
.active_async_finalizers
.get(&owner)
.is_some_and(|active_task| *active_task == task_id);
if should_clear_barrier {
self.active_async_finalizers.remove(&owner);
}
self.record_finalizer_run(finalizer_id);
}
debug!(
task_id = ?task_id,
region_id = ?owner,
outcome_kind = outcome_kind,
waiter_count = waiter_count,
"task cleanup from runtime state"
);
let orphaned = self.obligations.sorted_pending_ids_for_holder(task_id);
for ob_id in orphaned {
let _ = self.abort_obligation(ob_id, ObligationAbortReason::Cancel);
}
let _ = self.remove_task(task_id);
if let Some(region) = self.regions.get(owner.arena_index()) {
region.remove_task(task_id);
}
self.advance_region_state(owner);
waiters
}
pub fn drain_ready_async_finalizers(&mut self) -> SmallVec<[(TaskId, u8); 2]> {
if self.finalizing_regions.is_empty() {
return SmallVec::new();
}
let mut scheduled = SmallVec::new();
let mut regions_to_process = SmallVec::<[RegionId; 8]>::new();
for ®ion_id in &self.finalizing_regions {
if self.active_async_finalizers.contains_key(®ion_id) {
continue;
}
if let Some(region) = self.regions.get(region_id.arena_index()) {
if !region.finalizers_empty() {
regions_to_process.push(region_id);
}
}
}
for region_id in regions_to_process {
let Some((finalizer_id, finalizer)) = self.run_sync_finalizers_tracked(region_id)
else {
continue;
};
let Finalizer::Async(future) = finalizer else {
continue;
};
match self.spawn_finalizer_task(region_id, finalizer_id, future) {
Ok((task_id, priority)) => scheduled.push((task_id, priority)),
Err(future) => {
if let Some(region) = self.regions.get(region_id.arena_index()) {
region.add_finalizer(Finalizer::Async(future));
}
self.pending_finalizer_ids
.entry(region_id)
.or_default()
.push(finalizer_id);
}
}
}
scheduled
}
fn spawn_finalizer_task(
&mut self,
region_id: RegionId,
finalizer_id: u64,
future: BoxedAsyncFinalizer,
) -> Result<(TaskId, u8), BoxedAsyncFinalizer> {
let deadline = self
.current_runtime_time()
.saturating_add_nanos(FINALIZER_TIME_BUDGET_NANOS);
let budget = finalizer_budget().with_deadline(deadline);
let Ok((task_id, _handle, cx, result_tx)) =
self.create_task_infrastructure::<()>(region_id, budget, true)
else {
return Err(future);
};
let cx_inner = Arc::clone(&cx.inner);
let masked = MaskedFinalizer::new(future, cx_inner);
let wrapped_future = async move {
match (CatchUnwind { inner: masked }).await {
Ok(()) => {
let _ = result_tx.send(&cx, Ok::<_, JoinError>(()));
Outcome::Ok(())
}
Err(payload) => {
let panic_payload =
crate::types::outcome::PanicPayload::new(payload_to_string(&payload));
let _ = result_tx.send(
&cx,
Err::<(), JoinError>(JoinError::Panicked(panic_payload.clone())),
);
Outcome::Panicked(panic_payload)
}
}
};
self.tasks
.store_spawned_task(task_id, StoredTask::new_with_id(wrapped_future, task_id));
if let Some(record) = self.task(task_id) {
record.wake_state.notify();
}
self.async_finalizer_tasks.insert(task_id, finalizer_id);
let previous = self.active_async_finalizers.insert(region_id, task_id);
debug_assert!(
previous.is_none(),
"region {:?} already had an active async finalizer barrier: {:?}",
region_id,
previous
);
Ok((task_id, budget.priority))
}
pub fn register_sync_finalizer<F>(&mut self, region_id: RegionId, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
let accepts_finalizers = self
.regions
.get(region_id.arena_index())
.is_some_and(|region| !region.state().is_closing() && !region.state().is_terminal());
if !accepts_finalizers {
return false;
}
let finalizer_id = self.allocate_finalizer_id();
{
let Some(region) = self.regions.get(region_id.arena_index()) else {
return false;
};
region.add_finalizer(Finalizer::Sync(Box::new(f)));
}
self.record_finalizer_registration(finalizer_id, region_id);
let cancel_reason = CancelReason::user("sync_finalizer_registration");
self.debt_monitor.queue_work(
crate::observability::WorkType::RegionCleanup,
format!("sync_finalizer_{finalizer_id}_{region_id}"),
5, 2, &cancel_reason,
CancelKind::Shutdown,
Vec::new(),
);
true
}
pub fn register_async_finalizer<F>(&mut self, region_id: RegionId, future: F) -> bool
where
F: Future<Output = ()> + Send + 'static,
{
let accepts_finalizers = self
.regions
.get(region_id.arena_index())
.is_some_and(|region| !region.state().is_closing() && !region.state().is_terminal());
if !accepts_finalizers {
return false;
}
let finalizer_id = self.allocate_finalizer_id();
{
let Some(region) = self.regions.get(region_id.arena_index()) else {
return false;
};
region.add_finalizer(Finalizer::Async(Box::pin(future)));
}
self.record_finalizer_registration(finalizer_id, region_id);
let cancel_reason = CancelReason::user("async_finalizer_registration");
self.debt_monitor.queue_work(
crate::observability::WorkType::RegionCleanup,
format!("async_finalizer_{finalizer_id}_{region_id}"),
6, 3, &cancel_reason,
CancelKind::Shutdown,
Vec::new(),
);
true
}
fn allocate_finalizer_id(&mut self) -> u64 {
let id = self.next_finalizer_id;
self.next_finalizer_id = self
.next_finalizer_id
.checked_add(1)
.expect("finalizer ID overflow");
id
}
fn record_finalizer_registration(&mut self, id: u64, region: RegionId) {
let now = self.current_runtime_time();
self.pending_finalizer_ids
.entry(region)
.or_default()
.push(id);
self.finalizer_history
.push(FinalizerHistoryEvent::Registered {
id,
region,
time: now,
});
}
fn record_finalizer_run(&mut self, id: u64) {
let now = self.current_runtime_time();
self.finalizer_history
.push(FinalizerHistoryEvent::Ran { id, time: now });
}
fn record_finalizer_close(&mut self, region: RegionId) {
let now = self.current_runtime_time();
self.pending_finalizer_ids.remove(®ion);
self.finalizer_history
.push(FinalizerHistoryEvent::RegionClosed { region, time: now });
}
fn pop_tracked_finalizer(&mut self, region_id: RegionId) -> Option<(u64, Finalizer)> {
let finalizer = {
let region = self.regions.get(region_id.arena_index())?;
region.pop_finalizer()?
};
let (id, empty_after_pop) = {
let ids = self
.pending_finalizer_ids
.get_mut(®ion_id)
.expect("finalizer id tracking missing for region");
let id = ids.pop().expect("finalizer id stack out of sync");
(id, ids.is_empty())
};
if empty_after_pop {
self.pending_finalizer_ids.remove(®ion_id);
}
Some((id, finalizer))
}
pub fn pop_region_finalizer(&mut self, region_id: RegionId) -> Option<Finalizer> {
self.pop_tracked_finalizer(region_id)
.map(|(_, finalizer)| finalizer)
}
#[must_use]
pub fn region_finalizer_count(&self, region_id: RegionId) -> usize {
self.regions
.get(region_id.arena_index())
.map_or(0, RegionRecord::finalizer_count)
}
#[must_use]
pub fn region_finalizers_empty(&self, region_id: RegionId) -> bool {
self.regions
.get(region_id.arena_index())
.is_none_or(RegionRecord::finalizers_empty)
}
pub fn run_sync_finalizers(&mut self, region_id: RegionId) -> Option<Finalizer> {
self.run_sync_finalizers_tracked(region_id)
.map(|(_, finalizer)| finalizer)
}
fn run_sync_finalizers_tracked(&mut self, region_id: RegionId) -> Option<(u64, Finalizer)> {
loop {
let (finalizer_id, finalizer) = self.pop_tracked_finalizer(region_id)?;
match finalizer {
Finalizer::Sync(f) => {
if std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)).is_err() {
}
self.record_finalizer_run(finalizer_id);
}
Finalizer::Async(_) => {
return Some((finalizer_id, finalizer));
}
}
}
}
#[must_use]
pub fn can_region_complete_close(&self, region_id: RegionId) -> bool {
let Some(region) = self.regions.get(region_id.arena_index()) else {
return false;
};
if region.state() == crate::record::region::RegionState::Closed {
return true;
}
if region.state() != crate::record::region::RegionState::Finalizing {
return false;
}
if !region.finalizers_empty() {
return false;
}
if region.task_count() > 0 {
return false;
}
if region.pending_obligations() > 0 {
return false;
}
if region.child_count() > 0 {
return false;
}
true
}
#[allow(clippy::too_many_lines)]
pub fn advance_region_state(&mut self, initial_region: RegionId) {
let mut current = Some(initial_region);
while let Some(region_id) = current.take() {
let (state, parent) = {
let Some(region) = self.regions.get(region_id.arena_index()) else {
break;
};
(region.state(), region.parent)
};
match state {
crate::record::region::RegionState::Closing
| crate::record::region::RegionState::Draining => {
let transition_to_finalizing = {
let Some(region) = self.regions.get(region_id.arena_index()) else {
break;
};
let no_children = region.child_count() == 0;
let no_tasks = region.task_count() == 0;
if no_children && no_tasks {
let context = RegionContext {
region_id,
parent_region: region.parent,
created_at: region.created_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_region_protocol_transition(
region_id,
RegionEvent::RequestClose, &context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. }
| TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation(
"region finalize transition",
&validation_result,
);
}
region.begin_finalize()
} else {
if !no_children
&& region.state() == crate::record::region::RegionState::Closing
{
let context = RegionContext {
region_id,
parent_region: region.parent,
created_at: region.created_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_region_protocol_transition(
region_id,
RegionEvent::Cancel {
reason: "draining children".to_string(),
},
&context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. }
| TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation(
"region drain transition",
&validation_result,
);
}
region.begin_drain();
self.notify_runtime_epoch_advance(
super::epoch_tracker::ModuleId::RegionTable,
);
}
false
}
};
if transition_to_finalizing {
self.notify_runtime_epoch_advance(
super::epoch_tracker::ModuleId::RegionTable,
);
self.finalizing_regions.push(region_id);
current = Some(region_id);
}
}
crate::record::region::RegionState::Finalizing => {
if self.active_async_finalizers.contains_key(®ion_id) {
break;
}
if let Some((finalizer_id, async_finalizer)) =
self.run_sync_finalizers_tracked(region_id)
{
if let Some(region) = self.regions.get(region_id.arena_index()) {
region.add_finalizer(async_finalizer);
}
self.pending_finalizer_ids
.entry(region_id)
.or_default()
.push(finalizer_id);
break; }
if let Some(region) = self.regions.get(region_id.arena_index()) {
if region.pending_obligations() > 0 {
if region.task_count() == 0 {
let leaks = self
.collect_obligation_leaks(|record| record.region == region_id);
if !leaks.is_empty() {
self.handle_obligation_leaks(ObligationLeakError {
task_id: None,
region_id,
completion: None,
leaks,
});
}
}
}
}
if self.can_region_complete_close(region_id) {
let closed = {
let Some(region) = self.regions.get(region_id.arena_index()) else {
break;
};
let context = RegionContext {
region_id,
parent_region: region.parent,
created_at: region.created_at,
validation_level: CancelValidationLevel::Basic,
};
let validation_result = self.validate_region_protocol_transition(
region_id,
RegionEvent::FinalizerCompleted, &context,
);
if matches!(
validation_result,
TransitionResult::Invalid { .. }
| TransitionResult::InvariantViolation { .. }
) {
log_cancel_protocol_violation(
"region close completion",
&validation_result,
);
}
region.complete_close()
};
if closed {
if let Some(pos) =
self.finalizing_regions.iter().position(|&r| r == region_id)
{
self.finalizing_regions.swap_remove(pos);
}
self.record_finalizer_close(region_id);
let now = self.current_runtime_time();
self.record_trace_event(|seq| {
TraceEvent::new(
seq,
now,
TraceEventKind::RegionCloseComplete,
TraceData::Region {
region: region_id,
parent,
},
)
});
if let Some(region) = self.regions.get(region_id.arena_index()) {
let lifetime =
Duration::from_nanos(now.duration_since(region.created_at()));
self.metrics.region_closed(region_id, lifetime);
}
if let Some(parent_id) = parent {
if let Some(parent_record) =
self.regions.get(parent_id.arena_index())
{
parent_record.remove_child(region_id);
}
current = Some(parent_id);
}
self.remember_closed_region(region_id);
self.regions.remove(region_id.arena_index());
self.notify_runtime_epoch_advance(
super::epoch_tracker::ModuleId::RegionTable,
);
}
}
}
_ => {}
}
}
}
fn remember_closed_region(&mut self, region_id: RegionId) {
if !self.recently_closed_regions.insert(region_id) {
return;
}
self.recently_closed_region_order.push_back(region_id);
while self.recently_closed_region_order.len() > Self::RECENTLY_CLOSED_REGION_CAPACITY {
if let Some(evicted) = self.recently_closed_region_order.pop_front() {
self.recently_closed_regions.remove(&evicted);
}
}
}
pub(crate) fn finalizer_history(&self) -> &[FinalizerHistoryEvent] {
&self.finalizer_history
}
#[cfg(test)]
pub(crate) fn record_finalizer_close_for_test(&mut self, region: RegionId) {
self.record_finalizer_close(region);
}
pub fn resource_monitor(&self) -> Arc<ResourceMonitor> {
Arc::clone(&self.resource_monitor)
}
pub fn set_region_priority(&mut self, region_id: RegionId, priority: RegionPriority) -> bool {
if self.regions.get(region_id.arena_index()).is_none() {
return false;
}
self.resource_monitor
.engine()
.set_region_priority(region_id, priority);
true
}
pub fn should_accept_new_work(&self) -> bool {
let composite_level = self
.resource_monitor
.pressure()
.composite_degradation_level();
matches!(
composite_level,
DegradationLevel::None | DegradationLevel::Light
)
}
pub fn degradation_stats(&self) -> DegradationStatsSnapshot {
self.resource_monitor.engine().stats()
}
pub fn check_resource_pressure_for_region(
&self,
priority: RegionPriority,
) -> Result<(), RegionCreateError> {
let composite_level = self
.resource_monitor
.pressure()
.composite_degradation_level();
let should_shed = match (composite_level, priority) {
(_, RegionPriority::Critical | RegionPriority::High) => false,
(DegradationLevel::Heavy | DegradationLevel::Emergency, RegionPriority::Normal) => true,
(
DegradationLevel::Moderate | DegradationLevel::Heavy | DegradationLevel::Emergency,
RegionPriority::Low | RegionPriority::BestEffort,
) => true,
_ => false,
};
if should_shed {
Err(RegionCreateError::ResourcePressure {
requested_priority: priority,
reason: format!(
"Resource pressure level {:?} prevents region creation at priority {:?}",
composite_level, priority
),
})
} else {
Ok(())
}
}
}
impl Default for RuntimeState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct IdSnapshot {
pub index: u32,
pub generation: u32,
}
impl From<RegionId> for IdSnapshot {
fn from(id: RegionId) -> Self {
let arena = id.arena_index();
Self {
index: arena.index(),
generation: arena.generation(),
}
}
}
impl From<TaskId> for IdSnapshot {
fn from(id: TaskId) -> Self {
let arena = id.arena_index();
Self {
index: arena.index(),
generation: arena.generation(),
}
}
}
impl From<ObligationId> for IdSnapshot {
fn from(id: ObligationId) -> Self {
let arena = id.arena_index();
Self {
index: arena.index(),
generation: arena.generation(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct BudgetSnapshot {
pub deadline: Option<u64>,
pub poll_quota: u32,
pub cost_quota: Option<u64>,
pub priority: u8,
}
impl From<Budget> for BudgetSnapshot {
fn from(budget: Budget) -> Self {
Self {
deadline: budget.deadline.map(Time::as_nanos),
poll_quota: budget.poll_quota,
cost_quota: budget.cost_quota,
priority: budget.priority,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeSnapshot {
pub timestamp: u64,
pub regions: Vec<RegionSnapshot>,
pub tasks: Vec<TaskSnapshot>,
pub obligations: Vec<ObligationSnapshot>,
pub recent_events: Vec<EventSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionSnapshot {
pub id: IdSnapshot,
pub parent_id: Option<IdSnapshot>,
pub state: RegionStateSnapshot,
pub budget: BudgetSnapshot,
pub child_count: usize,
pub task_count: usize,
pub name: Option<String>,
}
impl RegionSnapshot {
fn from_record(record: &RegionRecord) -> Self {
let child_count = record.child_count();
let task_count = record.task_count();
Self {
id: record.id.into(),
parent_id: record.parent.map(IdSnapshot::from),
state: RegionStateSnapshot::from(record.state()),
budget: BudgetSnapshot::from(record.budget()),
child_count,
task_count,
name: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RegionStateSnapshot {
Open,
Closing,
Draining,
Finalizing,
Closed,
}
impl From<RegionState> for RegionStateSnapshot {
fn from(state: RegionState) -> Self {
match state {
RegionState::Open => Self::Open,
RegionState::Closing => Self::Closing,
RegionState::Draining => Self::Draining,
RegionState::Finalizing => Self::Finalizing,
RegionState::Closed => Self::Closed,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskSnapshot {
pub id: IdSnapshot,
pub region_id: IdSnapshot,
pub state: TaskStateSnapshot,
pub name: Option<String>,
pub poll_count: u64,
pub created_at: u64,
pub obligations: Vec<IdSnapshot>,
}
impl TaskSnapshot {
fn from_record(record: &TaskRecord, obligations: Vec<ObligationId>) -> Self {
let poll_count = record
.cx_inner
.as_ref()
.map(|inner| inner.read())
.map(|inner| inner.budget_baseline.poll_quota)
.map_or(0, |baseline| {
u64::from(baseline.saturating_sub(record.polls_remaining))
});
let obligations = obligations.into_iter().map(IdSnapshot::from).collect();
Self {
id: record.id.into(),
region_id: record.owner.into(),
state: TaskStateSnapshot::from_state(&record.state),
name: None,
poll_count,
created_at: record.created_at().as_nanos(),
obligations,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskStateSnapshot {
Created,
Running,
CancelRequested {
reason: CancelReasonSnapshot,
},
Cancelling {
reason: CancelReasonSnapshot,
},
Finalizing {
reason: CancelReasonSnapshot,
},
Completed {
outcome: OutcomeSnapshot,
},
}
impl TaskStateSnapshot {
fn from_state(state: &TaskState) -> Self {
match state {
TaskState::Created => Self::Created,
TaskState::Running => Self::Running,
TaskState::CancelRequested { reason, .. } => Self::CancelRequested {
reason: CancelReasonSnapshot::from(reason),
},
TaskState::Cancelling { reason, .. } => Self::Cancelling {
reason: CancelReasonSnapshot::from(reason),
},
TaskState::Finalizing { reason, .. } => Self::Finalizing {
reason: CancelReasonSnapshot::from(reason),
},
TaskState::Completed(outcome) => Self::Completed {
outcome: OutcomeSnapshot::from_outcome(outcome),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CancelKindSnapshot {
User,
Timeout,
Deadline,
PollQuota,
CostBudget,
FailFast,
RaceLost,
ParentCancelled,
ResourceUnavailable,
Shutdown,
LinkedExit,
}
impl From<CancelKind> for CancelKindSnapshot {
fn from(kind: CancelKind) -> Self {
match kind {
CancelKind::User => Self::User,
CancelKind::Timeout => Self::Timeout,
CancelKind::Deadline => Self::Deadline,
CancelKind::PollQuota => Self::PollQuota,
CancelKind::CostBudget => Self::CostBudget,
CancelKind::FailFast => Self::FailFast,
CancelKind::RaceLost => Self::RaceLost,
CancelKind::ParentCancelled => Self::ParentCancelled,
CancelKind::ResourceUnavailable => Self::ResourceUnavailable,
CancelKind::Shutdown => Self::Shutdown,
CancelKind::LinkedExit => Self::LinkedExit,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancelReasonSnapshot {
pub kind: CancelKindSnapshot,
pub origin_region: IdSnapshot,
pub origin_task: Option<IdSnapshot>,
pub timestamp: u64,
pub message: Option<String>,
pub cause: Option<Box<Self>>,
}
impl From<&CancelReason> for CancelReasonSnapshot {
fn from(reason: &CancelReason) -> Self {
Self {
kind: CancelKindSnapshot::from(reason.kind()),
origin_region: reason.origin_region.into(),
origin_task: reason.origin_task.map(IdSnapshot::from),
timestamp: reason.timestamp.as_nanos(),
message: reason.message.clone(),
cause: reason
.cause
.as_deref()
.map(|cause| Box::new(Self::from(cause))),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OutcomeSnapshot {
Ok,
Err {
message: Option<String>,
},
Cancelled {
reason: CancelReasonSnapshot,
},
Panicked {
message: Option<String>,
},
}
impl OutcomeSnapshot {
fn from_outcome(outcome: &Outcome<(), crate::error::Error>) -> Self {
match outcome {
Outcome::Ok(()) => Self::Ok,
Outcome::Err(err) => Self::Err {
message: Some(err.to_string()),
},
Outcome::Cancelled(reason) => Self::Cancelled {
reason: CancelReasonSnapshot::from(reason),
},
Outcome::Panicked(payload) => Self::Panicked {
message: Some(payload.message().to_string()),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DownReasonSnapshot {
Normal,
Error {
message: String,
},
Cancelled {
reason: CancelReasonSnapshot,
},
Panicked {
message: String,
},
}
impl From<&crate::monitor::DownReason> for DownReasonSnapshot {
fn from(reason: &crate::monitor::DownReason) -> Self {
match reason {
crate::monitor::DownReason::Normal => Self::Normal,
crate::monitor::DownReason::Error(message) => Self::Error {
message: message.clone(),
},
crate::monitor::DownReason::Cancelled(reason) => Self::Cancelled {
reason: CancelReasonSnapshot::from(reason),
},
crate::monitor::DownReason::Panicked(payload) => Self::Panicked {
message: payload.message().to_string(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObligationSnapshot {
pub id: IdSnapshot,
pub kind: ObligationKindSnapshot,
pub state: ObligationStateSnapshot,
pub holder_task: IdSnapshot,
pub owning_region: IdSnapshot,
pub created_at: u64,
}
impl ObligationSnapshot {
fn from_record(record: &ObligationRecord) -> Self {
Self {
id: record.id.into(),
kind: ObligationKindSnapshot::from(record.kind),
state: ObligationStateSnapshot::from(record.state),
holder_task: record.holder.into(),
owning_region: record.region.into(),
created_at: record.reserved_at.as_nanos(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ObligationKindSnapshot {
SendPermit,
Ack,
Lease,
IoOp,
SemaphorePermit,
}
impl From<ObligationKind> for ObligationKindSnapshot {
fn from(kind: ObligationKind) -> Self {
match kind {
ObligationKind::SendPermit => Self::SendPermit,
ObligationKind::Ack => Self::Ack,
ObligationKind::Lease => Self::Lease,
ObligationKind::IoOp => Self::IoOp,
ObligationKind::SemaphorePermit => Self::SemaphorePermit,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ObligationStateSnapshot {
Reserved,
Committed,
Aborted,
Leaked,
}
impl From<ObligationState> for ObligationStateSnapshot {
fn from(state: ObligationState) -> Self {
match state {
ObligationState::Reserved => Self::Reserved,
ObligationState::Committed => Self::Committed,
ObligationState::Aborted => Self::Aborted,
ObligationState::Leaked => Self::Leaked,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ObligationAbortReasonSnapshot {
Cancel,
Error,
Explicit,
}
impl From<ObligationAbortReason> for ObligationAbortReasonSnapshot {
fn from(reason: ObligationAbortReason) -> Self {
match reason {
ObligationAbortReason::Cancel => Self::Cancel,
ObligationAbortReason::Error => Self::Error,
ObligationAbortReason::Explicit => Self::Explicit,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventSnapshot {
pub version: u32,
pub seq: u64,
pub time: u64,
pub kind: EventKindSnapshot,
pub data: EventDataSnapshot,
}
impl EventSnapshot {
fn from_event(event: &TraceEvent) -> Self {
Self {
version: event.version,
seq: event.seq,
time: event.time.as_nanos(),
kind: EventKindSnapshot::from(event.kind),
data: EventDataSnapshot::from_trace_data(&event.data),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventKindSnapshot {
Spawn,
Schedule,
Yield,
Wake,
Poll,
Complete,
CancelRequest,
CancelAck,
WorkerCancelRequested,
WorkerCancelAcknowledged,
WorkerDrainStarted,
WorkerDrainCompleted,
WorkerFinalizeCompleted,
RegionCloseBegin,
RegionCloseComplete,
RegionCreated,
RegionCancelled,
ObligationReserve,
ObligationCommit,
ObligationAbort,
ObligationLeak,
TimeAdvance,
TimerScheduled,
TimerFired,
TimerCancelled,
IoRequested,
IoReady,
IoResult,
IoError,
RngSeed,
RngValue,
Checkpoint,
FuturelockDetected,
ChaosInjection,
UserTrace,
MonitorCreated,
MonitorDropped,
DownDelivered,
LinkCreated,
LinkDropped,
ExitDelivered,
}
impl From<TraceEventKind> for EventKindSnapshot {
fn from(kind: TraceEventKind) -> Self {
match kind {
TraceEventKind::Spawn => Self::Spawn,
TraceEventKind::Schedule => Self::Schedule,
TraceEventKind::Yield => Self::Yield,
TraceEventKind::Wake => Self::Wake,
TraceEventKind::Poll => Self::Poll,
TraceEventKind::Complete => Self::Complete,
TraceEventKind::CancelRequest => Self::CancelRequest,
TraceEventKind::CancelAck => Self::CancelAck,
TraceEventKind::WorkerCancelRequested => Self::WorkerCancelRequested,
TraceEventKind::WorkerCancelAcknowledged => Self::WorkerCancelAcknowledged,
TraceEventKind::WorkerDrainStarted => Self::WorkerDrainStarted,
TraceEventKind::WorkerDrainCompleted => Self::WorkerDrainCompleted,
TraceEventKind::WorkerFinalizeCompleted => Self::WorkerFinalizeCompleted,
TraceEventKind::RegionCloseBegin => Self::RegionCloseBegin,
TraceEventKind::RegionCloseComplete => Self::RegionCloseComplete,
TraceEventKind::RegionCreated => Self::RegionCreated,
TraceEventKind::RegionCancelled => Self::RegionCancelled,
TraceEventKind::ObligationReserve => Self::ObligationReserve,
TraceEventKind::ObligationCommit => Self::ObligationCommit,
TraceEventKind::ObligationAbort => Self::ObligationAbort,
TraceEventKind::ObligationLeak => Self::ObligationLeak,
TraceEventKind::TimeAdvance => Self::TimeAdvance,
TraceEventKind::TimerScheduled => Self::TimerScheduled,
TraceEventKind::TimerFired => Self::TimerFired,
TraceEventKind::TimerCancelled => Self::TimerCancelled,
TraceEventKind::IoRequested => Self::IoRequested,
TraceEventKind::IoReady => Self::IoReady,
TraceEventKind::IoResult => Self::IoResult,
TraceEventKind::IoError => Self::IoError,
TraceEventKind::RngSeed => Self::RngSeed,
TraceEventKind::RngValue => Self::RngValue,
TraceEventKind::Checkpoint => Self::Checkpoint,
TraceEventKind::FuturelockDetected => Self::FuturelockDetected,
TraceEventKind::ChaosInjection => Self::ChaosInjection,
TraceEventKind::UserTrace => Self::UserTrace,
TraceEventKind::MonitorCreated => Self::MonitorCreated,
TraceEventKind::MonitorDropped => Self::MonitorDropped,
TraceEventKind::DownDelivered => Self::DownDelivered,
TraceEventKind::LinkCreated => Self::LinkCreated,
TraceEventKind::LinkDropped => Self::LinkDropped,
TraceEventKind::ExitDelivered => Self::ExitDelivered,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventDataSnapshot {
None,
Task {
task: IdSnapshot,
region: IdSnapshot,
},
Region {
region: IdSnapshot,
parent: Option<IdSnapshot>,
},
Obligation {
obligation: IdSnapshot,
task: IdSnapshot,
region: IdSnapshot,
kind: ObligationKindSnapshot,
state: ObligationStateSnapshot,
duration_ns: Option<u64>,
abort_reason: Option<ObligationAbortReasonSnapshot>,
},
Cancel {
task: IdSnapshot,
region: IdSnapshot,
reason: CancelReasonSnapshot,
},
RegionCancel {
region: IdSnapshot,
reason: CancelReasonSnapshot,
},
Time {
old: u64,
new: u64,
},
Timer {
timer_id: u64,
deadline: Option<u64>,
},
IoRequested {
token: u64,
interest: u8,
},
IoReady {
token: u64,
readiness: u8,
},
IoResult {
token: u64,
bytes: i64,
},
IoError {
token: u64,
kind: u8,
},
RngSeed {
seed: u64,
},
RngValue {
value: u64,
},
Checkpoint {
sequence: u64,
active_tasks: u32,
active_regions: u32,
},
Futurelock {
task: IdSnapshot,
region: IdSnapshot,
idle_steps: u64,
held: Vec<HeldObligationSnapshot>,
},
Monitor {
monitor_ref: u64,
watcher: IdSnapshot,
watcher_region: IdSnapshot,
monitored: IdSnapshot,
},
Down {
monitor_ref: u64,
watcher: IdSnapshot,
monitored: IdSnapshot,
completion_vt: u64,
reason: DownReasonSnapshot,
},
Link {
link_ref: u64,
task_a: IdSnapshot,
region_a: IdSnapshot,
task_b: IdSnapshot,
region_b: IdSnapshot,
},
Exit {
link_ref: u64,
from: IdSnapshot,
to: IdSnapshot,
failure_vt: u64,
reason: DownReasonSnapshot,
},
Message(String),
Chaos {
kind: String,
task: Option<IdSnapshot>,
detail: String,
},
Worker {
worker_id: String,
job_id: u64,
decision_seq: u64,
replay_hash: u64,
task: IdSnapshot,
region: IdSnapshot,
obligation: IdSnapshot,
},
}
impl EventDataSnapshot {
#[allow(clippy::too_many_lines)]
fn from_trace_data(data: &TraceData) -> Self {
match data {
TraceData::None => Self::None,
TraceData::Task { task, region } => Self::Task {
task: (*task).into(),
region: (*region).into(),
},
TraceData::Region { region, parent } => Self::Region {
region: (*region).into(),
parent: parent.map(IdSnapshot::from),
},
TraceData::Obligation {
obligation,
task,
region,
kind,
state,
duration_ns,
abort_reason,
} => Self::Obligation {
obligation: (*obligation).into(),
task: (*task).into(),
region: (*region).into(),
kind: ObligationKindSnapshot::from(*kind),
state: ObligationStateSnapshot::from(*state),
duration_ns: *duration_ns,
abort_reason: abort_reason.map(ObligationAbortReasonSnapshot::from),
},
TraceData::Cancel {
task,
region,
reason,
} => Self::Cancel {
task: (*task).into(),
region: (*region).into(),
reason: CancelReasonSnapshot::from(reason),
},
TraceData::RegionCancel { region, reason } => Self::RegionCancel {
region: (*region).into(),
reason: CancelReasonSnapshot::from(reason),
},
TraceData::Time { old, new } => Self::Time {
old: old.as_nanos(),
new: new.as_nanos(),
},
TraceData::Timer { timer_id, deadline } => Self::Timer {
timer_id: *timer_id,
deadline: deadline.map(Time::as_nanos),
},
TraceData::IoRequested { token, interest } => Self::IoRequested {
token: *token,
interest: *interest,
},
TraceData::IoReady { token, readiness } => Self::IoReady {
token: *token,
readiness: *readiness,
},
TraceData::IoResult { token, bytes } => Self::IoResult {
token: *token,
bytes: *bytes,
},
TraceData::IoError { token, kind } => Self::IoError {
token: *token,
kind: *kind,
},
TraceData::RngSeed { seed } => Self::RngSeed { seed: *seed },
TraceData::RngValue { value } => Self::RngValue { value: *value },
TraceData::Checkpoint {
sequence,
active_tasks,
active_regions,
} => Self::Checkpoint {
sequence: *sequence,
active_tasks: *active_tasks,
active_regions: *active_regions,
},
TraceData::Futurelock {
task,
region,
idle_steps,
held,
} => Self::Futurelock {
task: (*task).into(),
region: (*region).into(),
idle_steps: *idle_steps,
held: held
.iter()
.map(|(obligation, kind)| HeldObligationSnapshot {
obligation: (*obligation).into(),
kind: ObligationKindSnapshot::from(*kind),
})
.collect(),
},
TraceData::Monitor {
monitor_ref,
watcher,
watcher_region,
monitored,
} => Self::Monitor {
monitor_ref: *monitor_ref,
watcher: (*watcher).into(),
watcher_region: (*watcher_region).into(),
monitored: (*monitored).into(),
},
TraceData::Down {
monitor_ref,
watcher,
monitored,
completion_vt,
reason,
} => Self::Down {
monitor_ref: *monitor_ref,
watcher: (*watcher).into(),
monitored: (*monitored).into(),
completion_vt: completion_vt.as_nanos(),
reason: DownReasonSnapshot::from(reason),
},
TraceData::Link {
link_ref,
task_a,
region_a,
task_b,
region_b,
} => Self::Link {
link_ref: *link_ref,
task_a: (*task_a).into(),
region_a: (*region_a).into(),
task_b: (*task_b).into(),
region_b: (*region_b).into(),
},
TraceData::Exit {
link_ref,
from,
to,
failure_vt,
reason,
} => Self::Exit {
link_ref: *link_ref,
from: (*from).into(),
to: (*to).into(),
failure_vt: failure_vt.as_nanos(),
reason: DownReasonSnapshot::from(reason),
},
TraceData::Message(message) => Self::Message(message.clone()),
TraceData::Chaos { kind, task, detail } => Self::Chaos {
kind: kind.clone(),
task: task.map(IdSnapshot::from),
detail: detail.clone(),
},
TraceData::Worker {
worker_id,
job_id,
decision_seq,
replay_hash,
task,
region,
obligation,
} => Self::Worker {
worker_id: worker_id.clone(),
job_id: *job_id,
decision_seq: *decision_seq,
replay_hash: *replay_hash,
task: (*task).into(),
region: (*region).into(),
obligation: (*obligation).into(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeldObligationSnapshot {
pub obligation: IdSnapshot,
pub kind: ObligationKindSnapshot,
}
#[cfg(test)]
#[allow(clippy::too_many_lines)]
mod tests {
use super::*;
use crate::observability::{LogEntry, ObservabilityConfig};
use crate::record::task::TaskState;
use crate::record::{ObligationKind, ObligationRecord, RegionLimits};
use crate::runtime::ModuleId;
use crate::runtime::reactor::LabReactor;
use crate::test_utils::init_test_logging;
use crate::time::{TimerDriverHandle, VirtualClock};
use crate::trace::event::TRACE_EVENT_SCHEMA_VERSION;
use crate::types::{CancelAttributionConfig, CancelKind};
use crate::util::ArenaIndex;
use parking_lot::Mutex;
use serde_json::{Value, json};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
#[derive(Default)]
struct TestMetrics {
cancellations: AtomicUsize,
completions: Mutex<Vec<OutcomeKind>>,
spawns: AtomicUsize,
}
impl MetricsProvider for TestMetrics {
fn task_spawned(&self, _: RegionId, _: TaskId) {
self.spawns.fetch_add(1, Ordering::Relaxed);
}
fn task_completed(&self, _: TaskId, outcome: OutcomeKind, _: Duration) {
self.completions.lock().push(outcome);
}
fn region_created(&self, _: RegionId, _: Option<RegionId>) {}
fn region_closed(&self, _: RegionId, _: Duration) {}
fn cancellation_requested(&self, _: RegionId, _: CancelKind) {
self.cancellations.fetch_add(1, Ordering::Relaxed);
}
fn drain_completed(&self, _: RegionId, _: Duration) {}
fn deadline_set(&self, _: RegionId, _: Duration) {}
fn deadline_exceeded(&self, _: RegionId) {}
fn deadline_warning(&self, _: &str, _: &'static str, _: Duration) {}
fn deadline_violation(&self, _: &str, _: Duration) {}
fn deadline_remaining(&self, _: &str, _: Duration) {}
fn checkpoint_interval(&self, _: &str, _: Duration) {}
fn task_stuck_detected(&self, _: &str) {}
fn obligation_created(&self, _: RegionId) {}
fn obligation_discharged(&self, _: RegionId) {}
fn obligation_leaked(&self, _: RegionId) {}
fn scheduler_tick(&self, _: usize, _: Duration) {}
}
struct TestWaker(AtomicBool);
use std::task::Wake;
impl Wake for TestWaker {
fn wake(self: Arc<Self>) {
self.0.store(true, Ordering::SeqCst);
}
}
fn scrub_runtime_snapshot_for_snapshot_test(value: Value) -> Value {
match value {
Value::Object(map) => {
if map.len() == 2
&& map.get("index").is_some_and(Value::is_number)
&& map.get("generation").is_some_and(Value::is_number)
{
return Value::String("[id]".to_string());
}
Value::Object(
map.into_iter()
.map(|(key, value)| {
let scrubbed = match key.as_str() {
"timestamp" if value.is_number() => {
Value::String("[timestamp]".to_string())
}
"created_at" if value.is_number() => {
Value::String("[created_at]".to_string())
}
"time" if value.is_number() => {
Value::String("[event_time]".to_string())
}
"deadline" if value.is_number() => {
Value::String("[deadline]".to_string())
}
_ => scrub_runtime_snapshot_for_snapshot_test(value),
};
(key, scrubbed)
})
.collect(),
)
}
Value::Array(items) => Value::Array(
items
.into_iter()
.map(scrub_runtime_snapshot_for_snapshot_test)
.collect(),
),
other => other,
}
}
fn label_region_for_snapshot(
region: RegionId,
labels: &[(RegionId, &'static str)],
) -> &'static str {
labels
.iter()
.find_map(|(id, label)| (*id == region).then_some(*label))
.unwrap_or("[region]")
}
fn scrub_cancel_reason_chain_for_snapshot(
reason: &CancelReason,
labels: &[(RegionId, &'static str)],
) -> Value {
json!({
"kind": reason.kind.as_str(),
"message": reason.message.clone(),
"origin_region": label_region_for_snapshot(reason.origin_region, labels),
"timestamp": "[timestamp]",
"chain_depth": reason.chain_depth(),
"root_cause_kind": reason.root_cause().kind.as_str(),
"root_cause_message": reason.root_cause().message.clone(),
"truncated": reason.is_truncated(),
"truncated_at_depth": reason.truncated_at_depth(),
"any_truncated": reason.any_truncated(),
"chain": reason
.chain()
.enumerate()
.map(|(level, entry)| {
json!({
"level": level,
"kind": entry.kind.as_str(),
"message": entry.message.clone(),
"origin_region": label_region_for_snapshot(entry.origin_region, labels),
"timestamp": "[timestamp]",
"truncated": entry.is_truncated(),
"truncated_at_depth": entry.truncated_at_depth(),
})
})
.collect::<Vec<_>>(),
})
}
fn nested_region_cancel_cause_chain_dump(max_chain_depth: usize) -> Value {
let mut state = RuntimeState::new();
state.set_cancel_attribution_config(CancelAttributionConfig::new(
max_chain_depth,
CancelAttributionConfig::DEFAULT_MAX_MEMORY,
));
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let grandchild = create_child_region(&mut state, child);
let leaf = create_child_region(&mut state, grandchild);
let _ = insert_task(&mut state, root);
let _ = insert_task(&mut state, child);
let _ = insert_task(&mut state, grandchild);
let _ = insert_task(&mut state, leaf);
let labels = [
(root, "root"),
(child, "child"),
(grandchild, "grandchild"),
(leaf, "leaf"),
];
let reason = CancelReason::deadline()
.with_message("budget exhausted")
.with_timestamp(Time::from_millis(42));
let _ = state.cancel_request(root, &reason, None);
json!({
"config": {
"max_chain_depth": max_chain_depth,
"max_chain_memory": CancelAttributionConfig::DEFAULT_MAX_MEMORY,
},
"regions": {
"root": scrub_cancel_reason_chain_for_snapshot(
state
.regions
.get(root.arena_index())
.expect("root missing")
.cancel_reason()
.as_ref()
.expect("root cancel reason missing"),
&labels,
),
"child": scrub_cancel_reason_chain_for_snapshot(
state
.regions
.get(child.arena_index())
.expect("child missing")
.cancel_reason()
.as_ref()
.expect("child cancel reason missing"),
&labels,
),
"grandchild": scrub_cancel_reason_chain_for_snapshot(
state
.regions
.get(grandchild.arena_index())
.expect("grandchild missing")
.cancel_reason()
.as_ref()
.expect("grandchild cancel reason missing"),
&labels,
),
"leaf": scrub_cancel_reason_chain_for_snapshot(
state
.regions
.get(leaf.arena_index())
.expect("leaf missing")
.cancel_reason()
.as_ref()
.expect("leaf cancel reason missing"),
&labels,
),
},
})
}
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
#[test]
fn epoch_tracker_advances_monotonically_per_runtime_module() {
init_test("epoch_tracker_advances_monotonically_per_runtime_module");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let _child = state
.create_child_region(root, Budget::INFINITE)
.expect("create child region");
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
let obligation_id = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
let _ = state
.commit_obligation(obligation_id)
.expect("commit obligation");
let stats = state.epoch_tracker.transition_statistics();
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.current_epoch == EpochId::new(3) && s.transition_count == 3),
"region-table transitions advance monotonically instead of replaying genesis",
true,
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.current_epoch == EpochId::new(3) && s.transition_count == 3)
);
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::TaskTable)
.is_some_and(|s| s.current_epoch == EpochId::new(1) && s.transition_count == 1),
"task-table transitions advance monotonically",
true,
stats
.per_module_stats
.get(&ModuleId::TaskTable)
.is_some_and(|s| s.current_epoch == EpochId::new(1) && s.transition_count == 1)
);
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::ObligationTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2) && s.transition_count == 2),
"obligation-table transitions advance monotonically",
true,
stats
.per_module_stats
.get(&ModuleId::ObligationTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2) && s.transition_count == 2)
);
crate::test_complete!("epoch_tracker_advances_monotonically_per_runtime_module");
}
#[test]
fn epoch_tracker_counts_task_table_cleanup_mutations() {
init_test("epoch_tracker_counts_task_table_cleanup_mutations");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
state
.task_mut(task_id)
.expect("task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task_id);
let stats = state.epoch_tracker.transition_statistics();
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::TaskTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2) && s.transition_count == 2),
"task-table epoch should advance for both task creation and cleanup",
true,
stats
.per_module_stats
.get(&ModuleId::TaskTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2) && s.transition_count == 2)
);
crate::test_complete!("epoch_tracker_counts_task_table_cleanup_mutations");
}
#[test]
fn timer_driver_timestamps_runtime_records_and_snapshot() {
init_test("timer_driver_timestamps_runtime_records_and_snapshot");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(42)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
let obligation_id = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
let region = state.region(root).expect("root region");
crate::assert_with_log!(
region.created_at() == Time::from_millis(42),
"root region uses timer-driver time",
Time::from_millis(42),
region.created_at()
);
let task = state.task(task_id).expect("task");
crate::assert_with_log!(
task.created_at() == Time::from_millis(42),
"task uses timer-driver time",
Time::from_millis(42),
task.created_at()
);
let obligation = state.obligation(obligation_id).expect("obligation");
crate::assert_with_log!(
obligation.reserved_at == Time::from_millis(42),
"obligation uses timer-driver time",
Time::from_millis(42),
obligation.reserved_at
);
let snapshot = state.snapshot();
crate::assert_with_log!(
snapshot.timestamp == Time::from_millis(42).as_nanos(),
"snapshot timestamp uses timer-driver time",
Time::from_millis(42).as_nanos(),
snapshot.timestamp
);
crate::test_complete!("timer_driver_timestamps_runtime_records_and_snapshot");
}
#[test]
fn epoch_tracker_uses_timer_driver_transition_timestamps() {
init_test("epoch_tracker_uses_timer_driver_transition_timestamps");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(7)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let root = state.create_root_region(Budget::INFINITE);
let stats = state.epoch_tracker.transition_statistics();
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.last_transition_time == Time::from_millis(7)),
"region epoch transition uses initial timer-driver time",
true,
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.last_transition_time == Time::from_millis(7))
);
clock.advance(Time::from_millis(5).as_nanos());
let _child = state
.create_child_region(root, Budget::INFINITE)
.expect("create child region");
let stats = state.epoch_tracker.transition_statistics();
crate::assert_with_log!(
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2)
&& s.last_transition_time == Time::from_millis(12)),
"region epoch transition tracks later timer-driver advances",
true,
stats
.per_module_stats
.get(&ModuleId::RegionTable)
.is_some_and(|s| s.current_epoch == EpochId::new(2)
&& s.last_transition_time == Time::from_millis(12))
);
crate::test_complete!("epoch_tracker_uses_timer_driver_transition_timestamps");
}
#[test]
fn timer_driver_timestamps_cancel_traces() {
init_test("timer_driver_timestamps_cancel_traces");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(7)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
clock.advance(Time::from_millis(5).as_nanos());
let expected_time = Time::from_millis(12);
let _ = state.cancel_request(root, &CancelReason::timeout(), None);
let events = state.trace.snapshot();
let cancel_event = events
.iter()
.find(|event| {
event.kind == TraceEventKind::CancelRequest
&& matches!(
event.data,
TraceData::Cancel { task, region, .. }
if task == task_id && region == root
)
})
.expect("cancel request event");
crate::assert_with_log!(
cancel_event.time == expected_time,
"cancel request trace uses timer-driver time",
expected_time,
cancel_event.time
);
let region_cancel_event = events
.iter()
.find(|event| {
event.kind == TraceEventKind::RegionCancelled
&& matches!(
event.data,
TraceData::RegionCancel { region, .. } if region == root
)
})
.expect("region cancelled event");
crate::assert_with_log!(
region_cancel_event.time == expected_time,
"region cancelled trace uses timer-driver time",
expected_time,
region_cancel_event.time
);
let region_close_begin = events
.iter()
.find(|event| {
event.kind == TraceEventKind::RegionCloseBegin
&& matches!(
event.data,
TraceData::Region {
region,
parent: None,
} if region == root
)
})
.expect("region close begin event");
crate::assert_with_log!(
region_close_begin.time == expected_time,
"region close begin trace uses timer-driver time",
expected_time,
region_close_begin.time
);
crate::test_complete!("timer_driver_timestamps_cancel_traces");
}
#[test]
fn timer_driver_timestamps_async_finalizer_deadline_and_history() {
init_test("timer_driver_timestamps_async_finalizer_deadline_and_history");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(100)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let region = state.create_root_region(Budget::INFINITE);
let registered = state.register_async_finalizer(region, async {});
crate::assert_with_log!(registered, "registered", true, registered);
let region_record = state
.regions
.get_mut(region.arena_index())
.expect("region missing");
region_record.begin_close(None);
region_record.begin_finalize();
state.finalizing_regions.push(region);
clock.advance(23);
let scheduled = state.drain_ready_async_finalizers();
crate::assert_with_log!(
scheduled.len() == 1,
"scheduled len",
1usize,
scheduled.len()
);
let task_id = scheduled[0].0;
let expected_deadline =
Time::from_nanos(123).saturating_add_nanos(FINALIZER_TIME_BUDGET_NANOS);
let finalizer_deadline = state
.task(task_id)
.expect("async finalizer task missing")
.cx_inner
.as_ref()
.expect("async finalizer cx missing")
.read()
.budget
.deadline
.expect("async finalizer deadline");
crate::assert_with_log!(
finalizer_deadline == expected_deadline,
"async finalizer deadline uses timer-driver time",
expected_deadline,
finalizer_deadline
);
state
.task_mut(task_id)
.expect("async finalizer task missing")
.complete(Outcome::Ok(()));
clock.advance(14);
let _ = state.task_completed(task_id);
crate::assert_with_log!(
state.finalizer_history()
== [
FinalizerHistoryEvent::Registered {
id: 0,
region,
time: Time::from_nanos(100),
},
FinalizerHistoryEvent::Ran {
id: 0,
time: Time::from_nanos(137),
},
FinalizerHistoryEvent::RegionClosed {
region,
time: Time::from_nanos(137),
},
],
"async finalizer history uses timer-driver time",
"registered@100, ran@137, closed@137",
format!("{:?}", state.finalizer_history())
);
crate::test_complete!("timer_driver_timestamps_async_finalizer_deadline_and_history");
}
#[test]
fn advance_region_state_noop_does_not_advance_region_epoch() {
init_test("advance_region_state_noop_does_not_advance_region_epoch");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let before = state.epoch_tracker.transition_statistics();
state.advance_region_state(root);
let after = state.epoch_tracker.transition_statistics();
crate::assert_with_log!(
before
.per_module_stats
.get(&ModuleId::RegionTable)
.map(|s| (s.current_epoch, s.transition_count))
== after
.per_module_stats
.get(&ModuleId::RegionTable)
.map(|s| (s.current_epoch, s.transition_count)),
"no-op region scan must not fabricate epoch transitions",
before
.per_module_stats
.get(&ModuleId::RegionTable)
.map(|s| (s.current_epoch, s.transition_count)),
after
.per_module_stats
.get(&ModuleId::RegionTable)
.map(|s| (s.current_epoch, s.transition_count))
);
crate::test_complete!("advance_region_state_noop_does_not_advance_region_epoch");
}
fn insert_task(state: &mut RuntimeState, region: RegionId) -> TaskId {
let idx = state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
region,
Budget::INFINITE,
));
let id = TaskId::from_arena(idx);
state.task_mut(id).expect("task missing").id = id;
let added = state
.regions
.get_mut(region.arena_index())
.expect("region missing")
.add_task(id);
crate::assert_with_log!(added.is_ok(), "task added to region", true, added.is_ok());
id
}
#[test]
fn cx_trace_emits_user_trace_event() {
init_test("cx_trace_emits_user_trace_event");
let metrics = Arc::new(TestMetrics::default());
let mut state = RuntimeState::new_with_metrics(metrics);
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
let cx = state
.task(task_id)
.and_then(|record| record.cx.clone())
.expect("cx missing");
cx.trace("user trace");
let saw_user_trace = state
.trace
.snapshot()
.iter()
.any(|event| event.kind == TraceEventKind::UserTrace);
crate::assert_with_log!(saw_user_trace, "user trace recorded", true, saw_user_trace);
crate::test_complete!("cx_trace_emits_user_trace_event");
}
#[test]
fn cx_log_attaches_collector_and_timestamp() {
init_test("cx_log_attaches_collector_and_timestamp");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
state.set_observability_config(ObservabilityConfig::testing().with_max_log_entries(8));
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
let cx = state
.task(task_id)
.and_then(|record| record.cx.clone())
.expect("cx missing");
cx.log(LogEntry::info("hello"));
let collector = cx.log_collector().expect("collector missing");
let entries = collector.peek();
crate::assert_with_log!(entries.len() == 1, "log entry count", 1, entries.len());
let entry = &entries[0];
crate::assert_with_log!(
entry.message() == "hello",
"log entry message",
"hello",
entry.message()
);
crate::assert_with_log!(
entry.timestamp() == Time::from_millis(5),
"log entry timestamp",
Time::from_millis(5),
entry.timestamp()
);
let task_str = task_id.to_string();
let region_str = root.to_string();
crate::assert_with_log!(
entry.get_field("task_id") == Some(task_str.as_str()),
"log entry task id",
task_str.as_str(),
entry.get_field("task_id")
);
crate::assert_with_log!(
entry.get_field("region_id") == Some(region_str.as_str()),
"log entry region id",
region_str.as_str(),
entry.get_field("region_id")
);
crate::test_complete!("cx_log_attaches_collector_and_timestamp");
}
#[test]
fn cx_log_respects_timestamp_toggle() {
init_test("cx_log_respects_timestamp_toggle");
let mut state = RuntimeState::new();
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(9)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let config = ObservabilityConfig::testing().with_include_timestamps(false);
state.set_observability_config(config);
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
let cx = state
.task(task_id)
.and_then(|record| record.cx.clone())
.expect("cx missing");
cx.log(LogEntry::info("no timestamps"));
let collector = cx.log_collector().expect("collector missing");
let entries = collector.peek();
crate::assert_with_log!(entries.len() == 1, "log entry count", 1, entries.len());
let entry = &entries[0];
crate::assert_with_log!(
entry.timestamp() == Time::ZERO,
"timestamps disabled",
Time::ZERO,
entry.timestamp()
);
crate::test_complete!("cx_log_respects_timestamp_toggle");
}
#[test]
fn cancel_request_emits_trace_and_metrics() {
init_test("cancel_request_emits_trace_and_metrics");
let metrics = Arc::new(TestMetrics::default());
let mut state = RuntimeState::new_with_metrics(metrics.clone());
let root = state.create_root_region(Budget::INFINITE);
let _ = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
let reason = CancelReason::timeout();
let _ = state.cancel_request(root, &reason, None);
let events = state.trace.snapshot();
let saw_cancel = events
.iter()
.any(|event| event.kind == TraceEventKind::CancelRequest);
crate::assert_with_log!(saw_cancel, "cancel trace recorded", true, saw_cancel);
let cancellations = metrics.cancellations.load(Ordering::Relaxed);
crate::assert_with_log!(
cancellations == 1,
"cancellation metrics",
1usize,
cancellations
);
crate::test_complete!("cancel_request_emits_trace_and_metrics");
}
#[test]
fn spawn_trace_attaches_logical_time() {
init_test("spawn_trace_attaches_logical_time");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let _ = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
let events = state.trace.snapshot();
let spawn_event = events
.iter()
.find(|event| event.kind == TraceEventKind::Spawn)
.expect("spawn event");
crate::assert_with_log!(
spawn_event.logical_time.is_some(),
"spawn logical time present",
true,
spawn_event.logical_time.is_some()
);
crate::test_complete!("spawn_trace_attaches_logical_time");
}
#[test]
fn cancellation_outcome_metric_emitted() {
init_test("cancellation_outcome_metric_emitted");
let metrics = Arc::new(TestMetrics::default());
let mut state = RuntimeState::new_with_metrics(metrics.clone());
let root = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(root, Budget::INFINITE, async { 1_u8 })
.expect("task spawn");
if let Some(record) = state.task_mut(task_id) {
record.complete(Outcome::Cancelled(CancelReason::timeout()));
}
let _ = state.task_completed(task_id);
let saw_cancelled = metrics.completions.lock().contains(&OutcomeKind::Cancelled);
crate::assert_with_log!(
saw_cancelled,
"cancelled outcome metric",
true,
saw_cancelled
);
crate::test_complete!("cancellation_outcome_metric_emitted");
}
#[test]
fn create_task_panic_reaches_join_handle() {
init_test("create_task_panic_reaches_join_handle");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let (task_id, mut handle) = state
.create_task(root, Budget::INFINITE, async {
panic!("state task boom");
#[allow(unreachable_code)]
1_u8
})
.expect("task create");
let waker = Waker::from(Arc::new(TestWaker(AtomicBool::new(false))));
let mut poll_cx = Context::from_waker(&waker);
let stored = state.get_stored_future(task_id).expect("stored task");
match stored.poll(&mut poll_cx) {
Poll::Ready(Outcome::Panicked(payload)) => {
crate::assert_with_log!(
payload.message() == "state task boom",
"panic payload captured on stored task",
"state task boom",
payload.message()
);
}
other => panic!("panicking task must complete with Outcome::Panicked: {other:?}"),
}
let task_cx = state
.task(task_id)
.and_then(|record| record.cx.clone())
.expect("task cx");
let mut join_fut = std::pin::pin!(handle.join(&task_cx));
match join_fut.as_mut().poll(&mut poll_cx) {
Poll::Ready(Err(crate::runtime::task_handle::JoinError::Panicked(payload))) => {
crate::assert_with_log!(
payload.message() == "state task boom",
"join handle receives panic payload",
"state task boom",
payload.message()
);
}
other => {
panic!("join of panicked state task must return JoinError::Panicked: {other:?}")
}
}
crate::test_complete!("create_task_panic_reaches_join_handle");
}
#[test]
fn snapshot_captures_entities() {
init_test("snapshot_captures_entities");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(region, Budget::INFINITE, async { 42 })
.expect("task create");
let obl_idx = state.obligations.insert(ObligationRecord::new(
ObligationId::from_arena(ArenaIndex::new(0, 0)),
ObligationKind::SendPermit,
task_id,
region,
state.now,
));
let obl_id = ObligationId::from_arena(obl_idx);
state
.obligations
.get_mut(obl_idx)
.expect("obligation missing")
.id = obl_id;
let snapshot = state.snapshot();
crate::assert_with_log!(
snapshot.regions.len() == 1,
"region count",
1,
snapshot.regions.len()
);
crate::assert_with_log!(
snapshot.tasks.len() == 1,
"task count",
1,
snapshot.tasks.len()
);
crate::assert_with_log!(
snapshot.obligations.len() == 1,
"obligation count",
1,
snapshot.obligations.len()
);
let task_snapshot = snapshot
.tasks
.iter()
.find(|t| t.id == IdSnapshot::from(task_id))
.expect("task snapshot missing");
let has_obligation = task_snapshot
.obligations
.contains(&IdSnapshot::from(obl_id));
crate::assert_with_log!(has_obligation, "task has obligation", true, has_obligation);
crate::test_complete!("snapshot_captures_entities");
}
#[test]
fn snapshot_preserves_event_version() {
init_test("snapshot_preserves_event_version");
let state = RuntimeState::new();
let event = TraceEvent::new(1, Time::ZERO, TraceEventKind::UserTrace, TraceData::None);
state.trace.push_event(event);
let snapshot = state.snapshot();
let event_snapshot = snapshot
.recent_events
.first()
.expect("event snapshot missing");
crate::assert_with_log!(
event_snapshot.version == TRACE_EVENT_SCHEMA_VERSION,
"event version",
TRACE_EVENT_SCHEMA_VERSION,
event_snapshot.version
);
crate::test_complete!("snapshot_preserves_event_version");
}
#[test]
fn snapshot_json_scrubs_ids_and_timestamps() {
init_test("snapshot_json_scrubs_ids_and_timestamps");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = state
.create_task(region, Budget::INFINITE, async { 42 })
.expect("task create");
let obligation_idx = state.obligations.insert(ObligationRecord::new(
ObligationId::from_arena(ArenaIndex::new(0, 0)),
ObligationKind::SendPermit,
task_id,
region,
state.now,
));
let obligation_id = ObligationId::from_arena(obligation_idx);
state
.obligations
.get_mut(obligation_idx)
.expect("obligation missing")
.id = obligation_id;
state.trace.push_event(TraceEvent::new(
99,
Time::from_millis(42),
TraceEventKind::UserTrace,
TraceData::None,
));
let snapshot = state.snapshot();
insta::assert_json_snapshot!(
"runtime_snapshot_entities_scrubbed",
scrub_runtime_snapshot_for_snapshot_test(serde_json::to_value(&snapshot).unwrap())
);
crate::test_complete!("snapshot_json_scrubs_ids_and_timestamps");
}
#[test]
fn region_cancel_cause_chain_dump_scrubbed() {
init_test("region_cancel_cause_chain_dump_scrubbed");
insta::assert_json_snapshot!(
"region_cancel_cause_chain_dump_scrubbed",
json!({
"full_chain": nested_region_cancel_cause_chain_dump(
CancelAttributionConfig::DEFAULT_MAX_DEPTH,
),
"depth_limited_chain": nested_region_cancel_cause_chain_dump(3),
})
);
crate::test_complete!("region_cancel_cause_chain_dump_scrubbed");
}
#[test]
fn can_region_complete_close_checks_running_finalizer_tasks() {
init_test("can_region_complete_close_checks_running_finalizer_tasks");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let region_record = state.regions.get_mut(region.arena_index()).expect("region");
region_record.begin_close(None);
region_record.begin_finalize();
let task = insert_task(&mut state, region);
state.task_mut(task).expect("task").start_running();
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(
!can_close,
"cannot close with running task",
false,
can_close
);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
let region_record = state.regions.get(region.arena_index()).expect("region");
region_record.remove_task(task);
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(can_close, "can close after task completes", true, can_close);
crate::test_complete!("can_region_complete_close_checks_running_finalizer_tasks");
}
#[test]
fn empty_state_is_quiescent() {
init_test("empty_state_is_quiescent");
let state = RuntimeState::new();
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "state quiescent", true, quiescent);
crate::test_complete!("empty_state_is_quiescent");
}
#[test]
fn create_root_region() {
init_test("create_root_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
crate::assert_with_log!(
state.root_region.is_some(),
"root region set",
true,
state.root_region.is_some()
);
crate::assert_with_log!(
state.root_region == Some(root),
"root id matches",
Some(root),
state.root_region
);
crate::assert_with_log!(
state.live_region_count() == 1,
"live region count",
1usize,
state.live_region_count()
);
crate::test_complete!("create_root_region");
}
#[test]
fn policy_can_cancel_siblings() {
init_test("policy_can_cancel_siblings");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let child = insert_task(&mut state, region);
let sib1 = insert_task(&mut state, region);
let sib2 = insert_task(&mut state, region);
let policy = crate::types::policy::FailFast;
let outcome = Outcome::<(), crate::error::Error>::Err(crate::error::Error::new(
crate::error::ErrorKind::User,
));
let (action, tasks) = state.apply_policy_on_child_outcome(region, child, &outcome, &policy);
let expected_action = PolicyAction::CancelSiblings(CancelReason::sibling_failed());
crate::assert_with_log!(
action == expected_action,
"cancel siblings action",
expected_action,
action
);
crate::assert_with_log!(tasks.len() == 2, "tasks len", 2usize, tasks.len());
for sib in [sib1, sib2] {
let record = state.task(sib).expect("sib missing");
let is_cancel_requested = matches!(&record.state, TaskState::CancelRequested { .. });
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
record.state
);
if let TaskState::CancelRequested { reason, .. } = &record.state {
crate::assert_with_log!(
reason.kind == CancelKind::FailFast,
"cancel reason kind",
CancelKind::FailFast,
reason.kind
);
}
}
let child_record = state.task(child).expect("child missing");
let is_created = matches!(child_record.state, TaskState::Created);
crate::assert_with_log!(is_created, "child remains created", true, is_created);
crate::test_complete!("policy_can_cancel_siblings");
}
#[test]
fn policy_does_not_cancel_siblings_on_cancelled_child() {
init_test("policy_does_not_cancel_siblings_on_cancelled_child");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let child = insert_task(&mut state, region);
let sib = insert_task(&mut state, region);
let policy = crate::types::policy::FailFast;
let outcome = Outcome::<(), crate::error::Error>::Cancelled(CancelReason::timeout());
let (action, _) = state.apply_policy_on_child_outcome(region, child, &outcome, &policy);
crate::assert_with_log!(
action == PolicyAction::Continue,
"action continue",
PolicyAction::Continue,
action
);
let sib_record = state.task(sib).expect("sib missing");
let is_created = matches!(sib_record.state, TaskState::Created);
crate::assert_with_log!(is_created, "sibling remains created", true, is_created);
crate::test_complete!("policy_does_not_cancel_siblings_on_cancelled_child");
}
fn create_child_region(state: &mut RuntimeState, parent: RegionId) -> RegionId {
let idx = state.regions.insert(RegionRecord::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
Some(parent),
Budget::INFINITE,
));
let id = RegionId::from_arena(idx);
state.regions.get_mut(idx).expect("region missing").id = id;
let added = state
.regions
.get_mut(parent.arena_index())
.expect("parent missing")
.add_child(id);
crate::assert_with_log!(added.is_ok(), "child added to parent", true, added.is_ok());
id
}
#[test]
fn cancel_request_marks_region() {
init_test("cancel_request_marks_region");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let idx = state.insert_task_with(|idx| {
TaskRecord::new_with_time(
TaskId::from_arena(idx),
region,
Budget::INFINITE,
Time::ZERO,
)
});
state
.regions
.get(region.arena_index())
.unwrap()
.add_task(TaskId::from_arena(idx))
.unwrap();
let _tasks = state.cancel_request(region, &CancelReason::timeout(), None);
let region_record = state
.regions
.get(region.arena_index())
.expect("region missing");
let cancel_reason = region_record.cancel_reason();
crate::assert_with_log!(
cancel_reason.is_some(),
"cancel reason set",
true,
cancel_reason.is_some()
);
let kind = cancel_reason.as_ref().unwrap().kind;
crate::assert_with_log!(
kind == CancelKind::Timeout,
"cancel kind timeout",
CancelKind::Timeout,
kind
);
crate::test_complete!("cancel_request_marks_region");
}
#[test]
fn cancel_request_marks_tasks() {
init_test("cancel_request_marks_tasks");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, region);
let task2 = insert_task(&mut state, region);
let tasks_to_schedule = state.cancel_request(region, &CancelReason::timeout(), None);
crate::assert_with_log!(
tasks_to_schedule.len() == 2,
"tasks scheduled",
2usize,
tasks_to_schedule.len()
);
let task_ids: Vec<_> = tasks_to_schedule.iter().map(|(id, _)| *id).collect();
crate::assert_with_log!(
task_ids.contains(&task1),
"contains task1",
true,
task_ids.contains(&task1)
);
crate::assert_with_log!(
task_ids.contains(&task2),
"contains task2",
true,
task_ids.contains(&task2)
);
for (task_id, _) in tasks_to_schedule {
let task = state.task(task_id).expect("task missing");
let is_cancel_requested = matches!(task.state, TaskState::CancelRequested { .. });
crate::assert_with_log!(
is_cancel_requested,
"task cancel requested",
true,
is_cancel_requested
);
}
crate::test_complete!("cancel_request_marks_tasks");
}
#[test]
fn cancel_request_propagates_to_descendants() {
init_test("cancel_request_propagates_to_descendants");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let grandchild = create_child_region(&mut state, child);
let root_task = insert_task(&mut state, root);
let child_task = insert_task(&mut state, child);
let grandchild_task = insert_task(&mut state, grandchild);
let tasks_to_schedule = state.cancel_request(root, &CancelReason::user("stop"), None);
crate::assert_with_log!(
tasks_to_schedule.len() == 3,
"tasks scheduled",
3usize,
tasks_to_schedule.len()
);
let root_record = state.regions.get(root.arena_index()).expect("root missing");
let root_kind = root_record.cancel_reason().as_ref().unwrap().kind;
crate::assert_with_log!(
root_kind == CancelKind::User,
"root cancel kind",
CancelKind::User,
root_kind
);
let child_record = state
.regions
.get(child.arena_index())
.expect("child missing");
let child_kind = child_record.cancel_reason().as_ref().unwrap().kind;
crate::assert_with_log!(
child_kind == CancelKind::ParentCancelled,
"child cancel kind",
CancelKind::ParentCancelled,
child_kind
);
let grandchild_record = state
.regions
.get(grandchild.arena_index())
.expect("grandchild missing");
let grandchild_kind = grandchild_record.cancel_reason().as_ref().unwrap().kind;
crate::assert_with_log!(
grandchild_kind == CancelKind::ParentCancelled,
"grandchild cancel kind",
CancelKind::ParentCancelled,
grandchild_kind
);
let root_task_record = state.task(root_task).expect("task missing");
let is_cancel_requested =
matches!(&root_task_record.state, TaskState::CancelRequested { .. });
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
root_task_record.state
);
if let TaskState::CancelRequested { reason, .. } = &root_task_record.state {
crate::assert_with_log!(
reason.kind == CancelKind::User,
"root task cancel kind",
CancelKind::User,
reason.kind
);
}
let child_task_record = state.task(child_task).expect("task missing");
let is_cancel_requested =
matches!(&child_task_record.state, TaskState::CancelRequested { .. });
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
child_task_record.state
);
if let TaskState::CancelRequested { reason, .. } = &child_task_record.state {
crate::assert_with_log!(
reason.kind == CancelKind::ParentCancelled,
"child task cancel kind",
CancelKind::ParentCancelled,
reason.kind
);
}
let grandchild_task_record = state.task(grandchild_task).expect("task missing");
let is_cancel_requested = matches!(
&grandchild_task_record.state,
TaskState::CancelRequested { .. }
);
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
grandchild_task_record.state
);
if let TaskState::CancelRequested { reason, .. } = &grandchild_task_record.state {
crate::assert_with_log!(
reason.kind == CancelKind::ParentCancelled,
"grandchild task cancel kind",
CancelKind::ParentCancelled,
reason.kind
);
}
crate::test_complete!("cancel_request_propagates_to_descendants");
}
#[test]
#[allow(clippy::too_many_lines)]
fn cancel_request_builds_cause_chains() {
init_test("cancel_request_builds_cause_chains");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let grandchild = create_child_region(&mut state, child);
let root_task = insert_task(&mut state, root);
let child_task = insert_task(&mut state, child);
let grandchild_task = insert_task(&mut state, grandchild);
let original_reason = CancelReason::deadline().with_message("budget exhausted");
let _ = state.cancel_request(root, &original_reason, None);
let root_record = state.regions.get(root.arena_index()).expect("root missing");
let root_reason_opt = root_record.cancel_reason();
let root_reason = root_reason_opt.as_ref().unwrap();
crate::assert_with_log!(
root_reason.kind == CancelKind::Deadline,
"root reason kind",
CancelKind::Deadline,
root_reason.kind
);
crate::assert_with_log!(
root_reason.chain_depth() == 1,
"root chain depth",
1,
root_reason.chain_depth()
);
crate::assert_with_log!(
root_reason.cause.is_none(),
"root has no cause",
true,
root_reason.cause.is_none()
);
let child_record = state
.regions
.get(child.arena_index())
.expect("child missing");
let child_reason_opt = child_record.cancel_reason();
let child_reason = child_reason_opt.as_ref().unwrap();
crate::assert_with_log!(
child_reason.kind == CancelKind::ParentCancelled,
"child reason kind",
CancelKind::ParentCancelled,
child_reason.kind
);
crate::assert_with_log!(
child_reason.chain_depth() == 2,
"child chain depth",
2,
child_reason.chain_depth()
);
let child_root_cause = child_reason.root_cause();
crate::assert_with_log!(
child_root_cause.kind == CancelKind::Deadline,
"child root cause kind",
CancelKind::Deadline,
child_root_cause.kind
);
crate::assert_with_log!(
child_reason.origin_region == root,
"child origin region",
root,
child_reason.origin_region
);
let grandchild_record = state
.regions
.get(grandchild.arena_index())
.expect("grandchild missing");
let grandchild_reason_opt = grandchild_record.cancel_reason();
let grandchild_reason = grandchild_reason_opt.as_ref().unwrap();
crate::assert_with_log!(
grandchild_reason.kind == CancelKind::ParentCancelled,
"grandchild reason kind",
CancelKind::ParentCancelled,
grandchild_reason.kind
);
crate::assert_with_log!(
grandchild_reason.chain_depth() == 3,
"grandchild chain depth",
3,
grandchild_reason.chain_depth()
);
let grandchild_root_cause = grandchild_reason.root_cause();
crate::assert_with_log!(
grandchild_root_cause.kind == CancelKind::Deadline,
"grandchild root cause kind",
CancelKind::Deadline,
grandchild_root_cause.kind
);
crate::assert_with_log!(
grandchild_reason.origin_region == child,
"grandchild origin region",
child,
grandchild_reason.origin_region
);
let grandchild_task_record = state.task(grandchild_task).expect("task missing");
let is_cancel_requested = matches!(
&grandchild_task_record.state,
TaskState::CancelRequested { .. }
);
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
grandchild_task_record.state
);
if let TaskState::CancelRequested { reason, .. } = &grandchild_task_record.state {
crate::assert_with_log!(
reason.chain_depth() == 3,
"grandchild task chain depth",
3,
reason.chain_depth()
);
crate::assert_with_log!(
reason.root_cause().kind == CancelKind::Deadline,
"grandchild task root cause",
CancelKind::Deadline,
reason.root_cause().kind
);
}
let chain: Vec<_> = grandchild_reason.chain().collect();
crate::assert_with_log!(chain.len() == 3, "chain length", 3, chain.len());
crate::assert_with_log!(
chain[0].kind == CancelKind::ParentCancelled,
"chain[0] kind",
CancelKind::ParentCancelled,
chain[0].kind
);
crate::assert_with_log!(
chain[1].kind == CancelKind::ParentCancelled,
"chain[1] kind",
CancelKind::ParentCancelled,
chain[1].kind
);
crate::assert_with_log!(
chain[2].kind == CancelKind::Deadline,
"chain[2] kind",
CancelKind::Deadline,
chain[2].kind
);
let _ = root_task;
let _ = child_task;
crate::test_complete!("cancel_request_builds_cause_chains");
}
#[test]
fn cancel_request_respects_attribution_limits() {
init_test("cancel_request_respects_attribution_limits");
let mut state = RuntimeState::new();
state.set_cancel_attribution_config(CancelAttributionConfig::new(2, 256));
let root = state.create_root_region(Budget::INFINITE);
let idx_root = state.insert_task_with(|idx| {
TaskRecord::new_with_time(TaskId::from_arena(idx), root, Budget::INFINITE, Time::ZERO)
});
state
.regions
.get(root.arena_index())
.unwrap()
.add_task(TaskId::from_arena(idx_root))
.unwrap();
let child = create_child_region(&mut state, root);
let idx_child = state.insert_task_with(|idx| {
TaskRecord::new_with_time(TaskId::from_arena(idx), child, Budget::INFINITE, Time::ZERO)
});
state
.regions
.get(child.arena_index())
.unwrap()
.add_task(TaskId::from_arena(idx_child))
.unwrap();
let grandchild = create_child_region(&mut state, child);
let idx_grandchild = state.insert_task_with(|idx| {
TaskRecord::new_with_time(
TaskId::from_arena(idx),
grandchild,
Budget::INFINITE,
Time::ZERO,
)
});
state
.regions
.get(grandchild.arena_index())
.unwrap()
.add_task(TaskId::from_arena(idx_grandchild))
.unwrap();
let reason = CancelReason::deadline().with_message("root deadline");
let _ = state.cancel_request(root, &reason, None);
let child_reason = state
.regions
.get(child.arena_index())
.and_then(RegionRecord::cancel_reason)
.expect("child cancel reason missing");
crate::assert_with_log!(
child_reason.chain_depth() == 2,
"child chain depth",
2,
child_reason.chain_depth()
);
crate::assert_with_log!(
!child_reason.truncated,
"child chain not truncated",
false,
child_reason.truncated
);
let grandchild_reason = state
.regions
.get(grandchild.arena_index())
.and_then(RegionRecord::cancel_reason)
.expect("grandchild cancel reason missing");
crate::assert_with_log!(
grandchild_reason.chain_depth() == 2,
"grandchild chain depth",
2,
grandchild_reason.chain_depth()
);
crate::assert_with_log!(
grandchild_reason.truncated,
"grandchild chain truncated",
true,
grandchild_reason.truncated
);
crate::assert_with_log!(
grandchild_reason.truncated_at_depth == Some(2),
"grandchild truncation depth",
Some(2),
grandchild_reason.truncated_at_depth
);
crate::test_complete!("cancel_request_respects_attribution_limits");
}
#[test]
fn cancel_request_respects_chain_depth_limit() {
init_test("cancel_request_respects_chain_depth_limit");
let mut state = RuntimeState::new();
state.set_cancel_attribution_config(CancelAttributionConfig::new(2, usize::MAX));
let root = state.create_root_region(Budget::INFINITE);
let mut current = root;
for _ in 0..4 {
current = create_child_region(&mut state, current);
}
let leaf_task = insert_task(&mut state, current);
let _ = state.cancel_request(root, &CancelReason::timeout(), None);
let leaf_record = state
.regions
.get(current.arena_index())
.expect("leaf missing");
let binding = leaf_record.cancel_reason();
let leaf_reason = binding.as_ref().expect("reason missing");
crate::assert_with_log!(
leaf_reason.chain_depth() <= 2,
"leaf chain depth bounded",
2,
leaf_reason.chain_depth()
);
crate::assert_with_log!(
leaf_reason.any_truncated(),
"leaf reason truncated",
true,
leaf_reason.any_truncated()
);
let leaf_task_record = state
.tasks
.get(leaf_task.arena_index())
.expect("task missing");
match &leaf_task_record.state {
TaskState::CancelRequested { reason, .. } => {
crate::assert_with_log!(
reason.chain_depth() <= 2,
"leaf task chain depth bounded",
2,
reason.chain_depth()
);
crate::assert_with_log!(
reason.any_truncated(),
"leaf task reason truncated",
true,
reason.any_truncated()
);
}
_other => {
unreachable!("expected CancelRequested");
}
}
crate::test_complete!("cancel_request_respects_chain_depth_limit");
}
#[test]
fn cancel_request_truncates_large_tree() {
init_test("cancel_request_truncates_large_tree");
let mut state = RuntimeState::new();
state.set_cancel_attribution_config(CancelAttributionConfig::new(4, 256));
let root = state.create_root_region(Budget::INFINITE);
let mut current = root;
for _ in 0..64 {
current = create_child_region(&mut state, current);
}
let leaf_task = insert_task(&mut state, current);
let _ = state.cancel_request(root, &CancelReason::shutdown(), None);
let leaf_record = state
.regions
.get(current.arena_index())
.expect("leaf missing");
let binding = leaf_record.cancel_reason();
let leaf_reason = binding.as_ref().expect("reason missing");
crate::assert_with_log!(
leaf_reason.chain_depth() <= 4,
"large tree chain depth bounded",
4,
leaf_reason.chain_depth()
);
crate::assert_with_log!(
leaf_reason.any_truncated(),
"large tree reason truncated",
true,
leaf_reason.any_truncated()
);
let leaf_task_record = state
.tasks
.get(leaf_task.arena_index())
.expect("task missing");
match &leaf_task_record.state {
TaskState::CancelRequested { reason, .. } => {
crate::assert_with_log!(
reason.chain_depth() <= 4,
"large tree task chain depth bounded",
4,
reason.chain_depth()
);
crate::assert_with_log!(
reason.any_truncated(),
"large tree task reason truncated",
true,
reason.any_truncated()
);
}
_other => {
unreachable!("expected CancelRequested");
}
}
crate::test_complete!("cancel_request_truncates_large_tree");
}
#[test]
fn cancel_request_strengthens_existing_reason() {
init_test("cancel_request_strengthens_existing_reason");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let _ = state.cancel_request(region, &CancelReason::user("stop"), None);
let _ = state.cancel_request(region, &CancelReason::shutdown(), None);
let region_record = state
.regions
.get(region.arena_index())
.expect("region missing");
let region_kind = region_record.cancel_reason().as_ref().unwrap().kind;
crate::assert_with_log!(
region_kind == CancelKind::Shutdown,
"region cancel kind",
CancelKind::Shutdown,
region_kind
);
let task_record = state.task(task).expect("task missing");
let is_cancel_requested = matches!(&task_record.state, TaskState::CancelRequested { .. });
assert!(
is_cancel_requested,
"expected CancelRequested, got {:?}",
task_record.state
);
if let TaskState::CancelRequested { reason, .. } = &task_record.state {
crate::assert_with_log!(
reason.kind == CancelKind::Shutdown,
"task cancel kind",
CancelKind::Shutdown,
reason.kind
);
}
crate::test_complete!("cancel_request_strengthens_existing_reason");
}
#[test]
fn can_region_finalize_with_all_tasks_done() {
init_test("can_region_finalize_with_all_tasks_done");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let can_finalize = state.can_region_finalize(region);
crate::assert_with_log!(
!can_finalize,
"cannot finalize with live task",
false,
can_finalize
);
state
.task_mut(task)
.expect("task missing")
.complete(Outcome::Ok(()));
let can_finalize = state.can_region_finalize(region);
crate::assert_with_log!(can_finalize, "can finalize", true, can_finalize);
crate::test_complete!("can_region_finalize_with_all_tasks_done");
}
#[test]
fn can_region_finalize_requires_child_regions_closed() {
init_test("can_region_finalize_requires_child_regions_closed");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let can_finalize = state.can_region_finalize(root);
crate::assert_with_log!(
!can_finalize,
"cannot finalize with open child",
false,
can_finalize
);
let child_record = state
.regions
.get_mut(child.arena_index())
.expect("child missing");
child_record.begin_close(None);
child_record.begin_finalize();
child_record.complete_close();
let can_finalize = state.can_region_finalize(root);
crate::assert_with_log!(can_finalize, "can finalize", true, can_finalize);
crate::test_complete!("can_region_finalize_requires_child_regions_closed");
}
#[test]
fn register_sync_finalizer() {
init_test("register_sync_finalizer");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
crate::assert_with_log!(
state.region_finalizers_empty(region),
"finalizers empty",
true,
state.region_finalizers_empty(region)
);
crate::assert_with_log!(
state.region_finalizer_count(region) == 0,
"finalizer count",
0usize,
state.region_finalizer_count(region)
);
let registered = state.register_sync_finalizer(region, || {});
crate::assert_with_log!(registered, "register sync finalizer", true, registered);
crate::assert_with_log!(
!state.region_finalizers_empty(region),
"finalizers not empty",
false,
state.region_finalizers_empty(region)
);
crate::assert_with_log!(
state.region_finalizer_count(region) == 1,
"finalizer count",
1usize,
state.region_finalizer_count(region)
);
crate::test_complete!("register_sync_finalizer");
}
#[test]
fn register_async_finalizer() {
init_test("register_async_finalizer");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let registered = state.register_async_finalizer(region, async {});
crate::assert_with_log!(registered, "register async finalizer", true, registered);
crate::assert_with_log!(
state.region_finalizer_count(region) == 1,
"finalizer count",
1usize,
state.region_finalizer_count(region)
);
crate::test_complete!("register_async_finalizer");
}
#[test]
fn register_finalizer_fails_when_region_not_open() {
init_test("register_finalizer_fails_when_region_not_open");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
state
.regions
.get_mut(region.arena_index())
.expect("region missing")
.begin_close(None);
let sync_ok = state.register_sync_finalizer(region, || {});
let async_ok = state.register_async_finalizer(region, async {});
crate::assert_with_log!(!sync_ok, "sync finalizer rejected", false, sync_ok);
crate::assert_with_log!(!async_ok, "async finalizer rejected", false, async_ok);
crate::test_complete!("register_finalizer_fails_when_region_not_open");
}
#[test]
fn register_finalizer_fails_for_nonexistent_region() {
init_test("register_finalizer_fails_for_nonexistent_region");
let mut state = RuntimeState::new();
let fake_region = RegionId::from_arena(ArenaIndex::new(999, 0));
let sync_ok = state.register_sync_finalizer(fake_region, || {});
let async_ok = state.register_async_finalizer(fake_region, async {});
crate::assert_with_log!(!sync_ok, "sync finalizer rejected", false, sync_ok);
crate::assert_with_log!(!async_ok, "async finalizer rejected", false, async_ok);
crate::test_complete!("register_finalizer_fails_for_nonexistent_region");
}
#[test]
fn pop_region_finalizer_lifo() {
init_test("pop_region_finalizer_lifo");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let order = std::sync::Arc::new(parking_lot::Mutex::new(Vec::new()));
let o1 = order.clone();
let o2 = order.clone();
let o3 = order.clone();
state.register_sync_finalizer(region, move || o1.lock().push(1));
state.register_sync_finalizer(region, move || o2.lock().push(2));
state.register_sync_finalizer(region, move || o3.lock().push(3));
while let Some(finalizer) = state.pop_region_finalizer(region) {
if let Finalizer::Sync(f) = finalizer {
f();
}
}
let observed = order.lock().clone();
crate::assert_with_log!(
observed == vec![3, 2, 1],
"finalizer order",
vec![3, 2, 1],
observed
);
crate::test_complete!("pop_region_finalizer_lifo");
}
#[test]
fn run_sync_finalizers_executes_and_returns_async() {
init_test("run_sync_finalizers_executes_and_returns_async");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let sync_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let sync_called_clone = sync_called.clone();
state.register_sync_finalizer(region, move || {
sync_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
state.register_async_finalizer(region, async {});
state.register_sync_finalizer(region, || {});
let async_finalizer = state.run_sync_finalizers(region);
let sync_flag = sync_called.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(
!sync_flag,
"first sync finalizer NOT called yet",
false,
sync_flag
);
crate::assert_with_log!(
async_finalizer.is_some(),
"async finalizer returned",
true,
async_finalizer.is_some()
);
let is_async = matches!(async_finalizer, Some(Finalizer::Async(_)));
crate::assert_with_log!(is_async, "is async", true, is_async);
let remaining = state.run_sync_finalizers(region);
crate::assert_with_log!(
remaining.is_none(),
"no more async",
true,
remaining.is_none()
);
let sync_flag = sync_called.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(sync_flag, "first sync finalizer called", true, sync_flag);
let empty = state.region_finalizers_empty(region);
crate::assert_with_log!(empty, "finalizers cleared", true, empty);
crate::test_complete!("run_sync_finalizers_executes_and_returns_async");
}
#[test]
fn finalizer_history_tracks_sync_registration_run_and_close() {
init_test("finalizer_history_tracks_sync_registration_run_and_close");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
state.now = Time::from_nanos(10);
let registered = state.register_sync_finalizer(region, || {});
crate::assert_with_log!(registered, "registered", true, registered);
state.now = Time::from_nanos(20);
let pending_async = state.run_sync_finalizers(region);
crate::assert_with_log!(
pending_async.is_none(),
"no async barrier",
true,
pending_async.is_none()
);
state.now = Time::from_nanos(30);
state.record_finalizer_close_for_test(region);
crate::assert_with_log!(
state.finalizer_history
== vec![
FinalizerHistoryEvent::Registered {
id: 0,
region,
time: Time::from_nanos(10),
},
FinalizerHistoryEvent::Ran {
id: 0,
time: Time::from_nanos(20),
},
FinalizerHistoryEvent::RegionClosed {
region,
time: Time::from_nanos(30),
},
],
"finalizer history",
"registered -> ran -> closed",
format!("{:?}", state.finalizer_history)
);
crate::test_complete!("finalizer_history_tracks_sync_registration_run_and_close");
}
#[test]
fn task_completed_records_async_finalizer_run_history() {
init_test("task_completed_records_async_finalizer_run_history");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
state.now = Time::from_nanos(10);
let registered = state.register_async_finalizer(region, async {});
crate::assert_with_log!(registered, "registered", true, registered);
let region_record = state
.regions
.get_mut(region.arena_index())
.expect("region missing");
region_record.begin_close(None);
region_record.begin_finalize();
state.finalizing_regions.push(region);
state.now = Time::from_nanos(20);
let scheduled = state.drain_ready_async_finalizers();
crate::assert_with_log!(
scheduled.len() == 1,
"scheduled len",
1usize,
scheduled.len()
);
let task_id = scheduled[0].0;
let task = state
.task_mut(task_id)
.expect("async finalizer task missing");
task.complete(Outcome::Ok(()));
state.now = Time::from_nanos(30);
let _ = state.task_completed(task_id);
crate::assert_with_log!(
state.finalizer_history
== vec![
FinalizerHistoryEvent::Registered {
id: 0,
region,
time: Time::from_nanos(10),
},
FinalizerHistoryEvent::Ran {
id: 0,
time: Time::from_nanos(30),
},
FinalizerHistoryEvent::RegionClosed {
region,
time: Time::from_nanos(30),
},
],
"finalizer history",
"registered -> ran -> closed",
format!("{:?}", state.finalizer_history)
);
crate::test_complete!("task_completed_records_async_finalizer_run_history");
}
#[test]
fn drain_ready_async_finalizers_runs_async_cleanup_even_with_zero_task_limit() {
init_test("drain_ready_async_finalizers_runs_async_cleanup_even_with_zero_task_limit");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let set_limits = state.set_region_limits(
region,
RegionLimits {
max_tasks: Some(0),
..RegionLimits::unlimited()
},
);
crate::assert_with_log!(set_limits, "limits set", true, set_limits);
let registered = state.register_async_finalizer(region, async {});
crate::assert_with_log!(registered, "registered", true, registered);
let region_record = state
.regions
.get_mut(region.arena_index())
.expect("region missing");
region_record.begin_close(None);
region_record.begin_finalize();
state.finalizing_regions.push(region);
let scheduled = state.drain_ready_async_finalizers();
crate::assert_with_log!(
scheduled.len() == 1,
"async finalizer task scheduled even when normal task limit is zero",
1usize,
scheduled.len()
);
let task_id = scheduled[0].0;
crate::assert_with_log!(
state.region_finalizer_count(region) == 0,
"async finalizer moved from barrier stack into running cleanup task",
0usize,
state.region_finalizer_count(region)
);
crate::assert_with_log!(
!state.can_region_complete_close(region),
"region must remain uncloseable while async cleanup task is still running",
false,
state.can_region_complete_close(region)
);
state
.task_mut(task_id)
.expect("async finalizer task missing")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task_id);
crate::assert_with_log!(
state.finalizer_history
== vec![
FinalizerHistoryEvent::Registered {
id: 0,
region,
time: Time::ZERO,
},
FinalizerHistoryEvent::Ran {
id: 0,
time: Time::ZERO,
},
FinalizerHistoryEvent::RegionClosed {
region,
time: Time::ZERO,
},
],
"history records cleanup execution and close once the finalizer task finishes",
"registered -> ran -> closed",
format!("{:?}", state.finalizer_history)
);
crate::test_complete!(
"drain_ready_async_finalizers_runs_async_cleanup_even_with_zero_task_limit"
);
}
#[test]
fn drain_ready_async_finalizers_blocks_lower_finalizers_while_async_barrier_runs() {
init_test("drain_ready_async_finalizers_blocks_lower_finalizers_while_async_barrier_runs");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let sync_runs = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let sync_runs_clone = Arc::clone(&sync_runs);
let registered_sync = state.register_sync_finalizer(region, move || {
sync_runs_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
});
crate::assert_with_log!(registered_sync, "sync registered", true, registered_sync);
let registered_async = state.register_async_finalizer(region, async {});
crate::assert_with_log!(registered_async, "async registered", true, registered_async);
let region_record = state
.regions
.get(region.arena_index())
.expect("region missing");
region_record.begin_close(None);
region_record.begin_finalize();
state.finalizing_regions.push(region);
let first = state.drain_ready_async_finalizers();
crate::assert_with_log!(
first.len() == 1,
"first async barrier scheduled",
1usize,
first.len()
);
crate::assert_with_log!(
sync_runs.load(std::sync::atomic::Ordering::SeqCst) == 0,
"lower sync finalizer has not run yet",
0usize,
sync_runs.load(std::sync::atomic::Ordering::SeqCst)
);
crate::assert_with_log!(
state.region_finalizer_count(region) == 1,
"lower finalizer still queued behind async barrier",
1usize,
state.region_finalizer_count(region)
);
let second = state.drain_ready_async_finalizers();
crate::assert_with_log!(
second.is_empty(),
"second drain does not bypass in-flight async barrier",
true,
second.is_empty()
);
crate::assert_with_log!(
sync_runs.load(std::sync::atomic::Ordering::SeqCst) == 0,
"lower sync finalizer still blocked",
0usize,
sync_runs.load(std::sync::atomic::Ordering::SeqCst)
);
let task_id = first[0].0;
state
.task_mut(task_id)
.expect("async finalizer task missing")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task_id);
crate::assert_with_log!(
sync_runs.load(std::sync::atomic::Ordering::SeqCst) == 1,
"lower sync finalizer runs after async barrier completes",
1usize,
sync_runs.load(std::sync::atomic::Ordering::SeqCst)
);
let region_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_removed,
"region closes after deferred lower finalizer runs",
true,
region_removed
);
crate::test_complete!(
"drain_ready_async_finalizers_blocks_lower_finalizers_while_async_barrier_runs"
);
}
#[test]
fn can_region_complete_close_requires_finalizing_state() {
init_test("can_region_complete_close_requires_finalizing_state");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(
!can_close,
"cannot close when not finalizing",
false,
can_close
);
let region_record = state.regions.get_mut(region.arena_index()).expect("region");
region_record.begin_close(None);
region_record.begin_finalize();
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(can_close, "can close", true, can_close);
crate::test_complete!("can_region_complete_close_requires_finalizing_state");
}
#[test]
fn can_region_complete_close_checks_finalizers() {
init_test("can_region_complete_close_checks_finalizers");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
state.register_sync_finalizer(region, || {});
let region_record = state.regions.get_mut(region.arena_index()).expect("region");
region_record.begin_close(None);
region_record.begin_finalize();
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(
!can_close,
"cannot close with pending finalizers",
false,
can_close
);
let _ = state.run_sync_finalizers(region);
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(can_close, "can close", true, can_close);
crate::test_complete!("can_region_complete_close_checks_finalizers");
}
#[test]
fn task_completed_removes_task_from_region() {
init_test("task_completed_removes_task_from_region");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, region);
let task2 = insert_task(&mut state, region);
let task3 = insert_task(&mut state, region);
let region_record = state.regions.get(region.arena_index()).expect("region");
let task_ids = region_record.task_ids();
crate::assert_with_log!(task_ids.len() == 3, "task count", 3usize, task_ids.len());
crate::assert_with_log!(
task_ids.contains(&task1),
"contains task1",
true,
task_ids.contains(&task1)
);
crate::assert_with_log!(
task_ids.contains(&task2),
"contains task2",
true,
task_ids.contains(&task2)
);
crate::assert_with_log!(
task_ids.contains(&task3),
"contains task3",
true,
task_ids.contains(&task3)
);
state
.task_mut(task2)
.expect("task2")
.complete(Outcome::Ok(()));
let waiters = state.task_completed(task2);
crate::assert_with_log!(waiters.is_empty(), "no waiters", true, waiters.is_empty());
let region_record = state.regions.get(region.arena_index()).expect("region");
let task_ids = region_record.task_ids();
crate::assert_with_log!(task_ids.len() == 2, "task count", 2usize, task_ids.len());
crate::assert_with_log!(
task_ids.contains(&task1),
"contains task1",
true,
task_ids.contains(&task1)
);
crate::assert_with_log!(
!task_ids.contains(&task2),
"task2 removed",
false,
task_ids.contains(&task2)
);
crate::assert_with_log!(
task_ids.contains(&task3),
"contains task3",
true,
task_ids.contains(&task3)
);
let removed = state.task(task2).is_none();
crate::assert_with_log!(removed, "task2 removed from state", true, removed);
state
.task_mut(task1)
.expect("task1")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task1);
state
.task_mut(task3)
.expect("task3")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task3);
let region_record = state.regions.get(region.arena_index()).expect("region");
let empty = region_record.task_ids().is_empty();
crate::assert_with_log!(empty, "region tasks empty", true, empty);
crate::test_complete!("task_completed_removes_task_from_region");
}
#[test]
fn spawn_rejected_when_task_limit_reached() {
init_test("spawn_rejected_when_task_limit_reached");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let limits = RegionLimits {
max_tasks: Some(1),
..RegionLimits::unlimited()
};
let set = state.set_region_limits(region, limits);
crate::assert_with_log!(set, "limits set", true, set);
let (task_id, _handle) = state
.create_task(region, Budget::INFINITE, async { 1_u8 })
.expect("first task");
let result = state.create_task(region, Budget::INFINITE, async { 2_u8 });
let rejected = matches!(result, Err(SpawnError::RegionAtCapacity { .. }));
crate::assert_with_log!(rejected, "spawn rejected", true, rejected);
let region_record = state.regions.get(region.arena_index()).expect("region");
let tasks = region_record.task_ids();
crate::assert_with_log!(tasks.len() == 1, "one task live", 1, tasks.len());
crate::assert_with_log!(
tasks.contains(&task_id),
"task id preserved",
true,
tasks.contains(&task_id)
);
crate::assert_with_log!(
state.tasks_arena().len() == 1,
"arena len stable",
1,
state.tasks_arena().len()
);
crate::test_complete!("spawn_rejected_when_task_limit_reached");
}
#[test]
fn obligation_rejected_when_limit_reached() {
init_test("obligation_rejected_when_limit_reached");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let limits = RegionLimits {
max_obligations: Some(0),
..RegionLimits::unlimited()
};
let set = state.set_region_limits(region, limits);
crate::assert_with_log!(set, "limits set", true, set);
let holder = insert_task(&mut state, region);
let err = state
.create_obligation(ObligationKind::IoOp, holder, region, None)
.expect_err("obligation limit enforced");
crate::assert_with_log!(
err.kind() == ErrorKind::AdmissionDenied,
"admission denied",
ErrorKind::AdmissionDenied,
err.kind()
);
let pending = state.pending_obligation_count();
crate::assert_with_log!(pending == 0, "no obligations recorded", 0, pending);
crate::test_complete!("obligation_rejected_when_limit_reached");
}
#[test]
fn create_obligation_rejects_missing_holder_task() {
init_test("create_obligation_rejects_missing_holder_task");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let missing_holder = TaskId::from_arena(ArenaIndex::new(999, 0));
let err = state
.create_obligation(ObligationKind::IoOp, missing_holder, region, None)
.expect_err("missing holder should be rejected");
crate::assert_with_log!(
err.kind() == ErrorKind::TaskNotOwned,
"missing holder rejected as task ownership error",
ErrorKind::TaskNotOwned,
err.kind()
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"no obligations created for missing holder",
0usize,
state.pending_obligation_count()
);
crate::test_complete!("create_obligation_rejects_missing_holder_task");
}
#[test]
fn create_obligation_rejects_holder_owned_by_different_region() {
init_test("create_obligation_rejects_holder_owned_by_different_region");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let child_task = insert_task(&mut state, child);
let err = state
.create_obligation(ObligationKind::IoOp, child_task, root, None)
.expect_err("cross-region holder should be rejected");
crate::assert_with_log!(
err.kind() == ErrorKind::TaskNotOwned,
"cross-region holder rejected as task ownership error",
ErrorKind::TaskNotOwned,
err.kind()
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"no obligations created for cross-region holder",
0usize,
state.pending_obligation_count()
);
crate::test_complete!("create_obligation_rejects_holder_owned_by_different_region");
}
#[test]
fn cancel_request_should_prevent_new_spawns() {
init_test("cancel_request_should_prevent_new_spawns");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let idx = state.insert_task_with(|idx| {
TaskRecord::new_with_time(
TaskId::from_arena(idx),
region,
Budget::INFINITE,
Time::ZERO,
)
});
state
.regions
.get(region.arena_index())
.unwrap()
.add_task(TaskId::from_arena(idx))
.unwrap();
let _ = state.cancel_request(region, &CancelReason::user("stop"), None);
let region_record = state.regions.get(region.arena_index()).expect("region");
let region_state = region_record.state();
let can_spawn = region_state.can_spawn();
crate::assert_with_log!(
!can_spawn,
"region no longer accepts spawns",
false,
can_spawn
);
let result = state.create_task(region, Budget::INFINITE, async { 42 });
let rejected = matches!(result, Err(SpawnError::RegionClosed(_)));
crate::assert_with_log!(rejected, "spawn rejected", true, rejected);
crate::test_complete!("cancel_request_should_prevent_new_spawns");
}
#[test]
fn new_creates_state_without_io_driver() {
init_test("new_creates_state_without_io_driver");
let state = RuntimeState::new();
crate::assert_with_log!(
!state.has_io_driver(),
"no io driver",
false,
state.has_io_driver()
);
crate::assert_with_log!(
state.io_driver().is_none(),
"io driver none",
true,
state.io_driver().is_none()
);
crate::test_complete!("new_creates_state_without_io_driver");
}
#[test]
fn without_reactor_creates_state_without_io_driver() {
init_test("without_reactor_creates_state_without_io_driver");
let state = RuntimeState::without_reactor();
crate::assert_with_log!(
!state.has_io_driver(),
"no io driver",
false,
state.has_io_driver()
);
crate::assert_with_log!(
state.io_driver().is_none(),
"io driver none",
true,
state.io_driver().is_none()
);
crate::test_complete!("without_reactor_creates_state_without_io_driver");
}
#[test]
fn with_reactor_creates_state_with_io_driver() {
init_test("with_reactor_creates_state_with_io_driver");
let reactor = Arc::new(LabReactor::new());
let state = RuntimeState::with_reactor(reactor);
crate::assert_with_log!(
state.has_io_driver(),
"has io driver",
true,
state.has_io_driver()
);
crate::assert_with_log!(
state.io_driver().is_some(),
"io driver some",
true,
state.io_driver().is_some()
);
let driver = state.io_driver().unwrap();
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::assert_with_log!(
driver.waker_count() == 0,
"waker count",
0usize,
driver.waker_count()
);
crate::test_complete!("with_reactor_creates_state_with_io_driver");
}
#[test]
fn set_io_driver_injects_driver_into_state() {
init_test("set_io_driver_injects_driver_into_state");
let mut state = RuntimeState::new();
crate::assert_with_log!(
!state.has_io_driver(),
"starts without io driver",
false,
state.has_io_driver()
);
let handle = IoDriverHandle::new(Arc::new(LabReactor::new()));
let waker_state = Arc::new(TestWaker(AtomicBool::new(false)));
let waker = Waker::from(waker_state);
{
let mut driver = handle.lock();
let _ = driver.register_waker(waker);
}
state.set_io_driver(handle);
crate::assert_with_log!(
state.has_io_driver(),
"io driver attached",
true,
state.has_io_driver()
);
let injected = state.io_driver_handle().expect("state io driver");
crate::assert_with_log!(
injected.waker_count() == 1,
"injected handle retained state",
1usize,
injected.waker_count()
);
crate::test_complete!("set_io_driver_injects_driver_into_state");
}
#[test]
fn io_driver_mut_allows_modification() {
init_test("io_driver_mut_allows_modification");
let reactor = Arc::new(LabReactor::new());
let state = RuntimeState::with_reactor(reactor);
let waker_state = Arc::new(TestWaker(AtomicBool::new(false)));
let waker = Waker::from(waker_state);
{
let mut driver = state.io_driver_mut().unwrap();
let _key = driver.register_waker(waker);
}
let waker_count = state.io_driver().unwrap().waker_count();
crate::assert_with_log!(waker_count == 1, "waker count", 1usize, waker_count);
let empty = state.io_driver().unwrap().is_empty();
crate::assert_with_log!(!empty, "driver not empty", false, empty);
crate::test_complete!("io_driver_mut_allows_modification");
}
#[test]
fn is_quiescent_considers_io_driver() {
init_test("is_quiescent_considers_io_driver");
let reactor = Arc::new(LabReactor::new());
let state = RuntimeState::with_reactor(reactor);
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "initial quiescent", true, quiescent);
let waker_state = Arc::new(TestWaker(AtomicBool::new(false)));
let waker = Waker::from(waker_state);
let key = {
let mut driver = state.io_driver_mut().unwrap();
driver.register_waker(waker)
};
let quiescent = state.is_quiescent();
crate::assert_with_log!(!quiescent, "not quiescent", false, quiescent);
{
let mut driver = state.io_driver_mut().unwrap();
driver.deregister_waker(key);
}
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "quiescent again", true, quiescent);
crate::test_complete!("is_quiescent_considers_io_driver");
}
#[test]
fn is_quiescent_without_io_driver_ignores_io() {
init_test("is_quiescent_without_io_driver_ignores_io");
let state = RuntimeState::new();
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "quiescent", true, quiescent);
crate::test_complete!("is_quiescent_without_io_driver_ignores_io");
}
#[test]
#[allow(clippy::too_many_lines)]
fn cancel_drain_finalize_full_lifecycle() {
init_test("cancel_drain_finalize_full_lifecycle");
let metrics = Arc::new(TestMetrics::default());
let mut state = RuntimeState::new_with_metrics(metrics.clone());
let root = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, root);
let task2 = insert_task(&mut state, root);
let finalizer_called = Arc::new(AtomicBool::new(false));
let finalizer_flag = finalizer_called.clone();
state.register_sync_finalizer(root, move || {
finalizer_flag.store(true, Ordering::SeqCst);
});
let tasks_to_schedule = state.cancel_request(root, &CancelReason::timeout(), None);
crate::assert_with_log!(
tasks_to_schedule.len() == 2,
"both tasks scheduled for cancel",
2usize,
tasks_to_schedule.len()
);
let region_state = state
.regions
.get(root.arena_index())
.expect("region")
.state();
crate::assert_with_log!(
region_state == crate::record::region::RegionState::Closing,
"region closing after cancel request",
crate::record::region::RegionState::Closing,
region_state
);
state
.task_mut(task1)
.expect("task1")
.complete(Outcome::Cancelled(CancelReason::timeout()));
let _ = state.task_completed(task1);
let region_state = state
.regions
.get(root.arena_index())
.expect("region")
.state();
crate::assert_with_log!(
region_state == crate::record::region::RegionState::Closing,
"region still closing with live task",
crate::record::region::RegionState::Closing,
region_state
);
let finalizer_ran = finalizer_called.load(Ordering::SeqCst);
crate::assert_with_log!(
!finalizer_ran,
"finalizer not yet called",
false,
finalizer_ran
);
state
.task_mut(task2)
.expect("task2")
.complete(Outcome::Cancelled(CancelReason::timeout()));
let _ = state.task_completed(task2);
let region_state_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed after all tasks complete (removed)",
true,
region_state_removed
);
let finalizer_ran = finalizer_called.load(Ordering::SeqCst);
crate::assert_with_log!(
finalizer_ran,
"finalizer was called during finalization",
true,
finalizer_ran
);
let cancelled_count = metrics
.completions
.lock()
.iter()
.filter(|o| **o == OutcomeKind::Cancelled)
.count();
crate::assert_with_log!(
cancelled_count == 2,
"cancelled completions count",
2usize,
cancelled_count
);
let events = state.trace.snapshot();
let cancel_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::CancelRequest)
.count();
crate::assert_with_log!(
cancel_events >= 1,
"cancel request trace events",
true,
cancel_events >= 1
);
crate::test_complete!("cancel_drain_finalize_full_lifecycle");
}
#[test]
fn cancel_drain_finalize_nested_regions() {
init_test("cancel_drain_finalize_nested_regions");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let root_task = insert_task(&mut state, root);
let child_task = insert_task(&mut state, child);
let _ = state.cancel_request(root, &CancelReason::user("stop"), None);
state
.task_mut(child_task)
.expect("child_task")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(child_task);
let child_state_removed = state.regions.get(child.arena_index()).is_none();
crate::assert_with_log!(
child_state_removed,
"child closed after its task completes (removed)",
true,
child_state_removed
);
let root_state = state
.regions
.get(root.arena_index())
.expect("root region")
.state();
let root_closing = matches!(
root_state,
crate::record::region::RegionState::Closing
| crate::record::region::RegionState::Draining
);
crate::assert_with_log!(
root_closing,
"root still closing/draining with live task",
true,
root_closing
);
state
.task_mut(root_task)
.expect("root_task")
.complete(Outcome::Cancelled(CancelReason::user("stop")));
let _ = state.task_completed(root_task);
let root_state_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_state_removed,
"root closed after all tasks and children done (removed)",
true,
root_state_removed
);
crate::test_complete!("cancel_drain_finalize_nested_regions");
}
#[test]
fn obligations_auto_aborted_on_cancelled_task_completion() {
init_test("obligations_auto_aborted_on_cancelled_task_completion");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Silent;
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl_send = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create send permit");
let obl_ack = state
.create_obligation(ObligationKind::Ack, task, region, None)
.expect("create ack");
let obl_io = state
.create_obligation(ObligationKind::IoOp, task, region, None)
.expect("create io op");
crate::assert_with_log!(
state.pending_obligation_count() == 3,
"three pending obligations",
3usize,
state.pending_obligation_count()
);
let _ = state.cancel_request(region, &CancelReason::timeout(), None);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::timeout()));
let _ = state.task_completed(task);
for obl_id in [obl_send, obl_ack, obl_io] {
let record = state
.obligations
.get(obl_id.arena_index())
.expect("obligation still in arena");
crate::assert_with_log!(
!record.is_pending(),
"obligation resolved after task cancel",
false,
record.is_pending()
);
}
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"zero pending obligations",
0usize,
state.pending_obligation_count()
);
let events = state.trace.snapshot();
let abort_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationAbort)
.count();
crate::assert_with_log!(
abort_events == 3,
"three obligation abort trace events",
3usize,
abort_events
);
crate::test_complete!("obligations_auto_aborted_on_cancelled_task_completion");
}
#[test]
fn obligation_commit_before_cancel_then_drain() {
init_test("obligation_commit_before_cancel_then_drain");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
let _ = state.commit_obligation(obl).expect("commit before cancel");
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"no pending after commit",
0usize,
state.pending_obligation_count()
);
let _ = state.cancel_request(region, &CancelReason::timeout(), None);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::timeout()));
let _ = state.task_completed(task);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed cleanly (removed)",
true,
region_state_removed
);
let events = state.trace.snapshot();
let commit_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationCommit)
.count();
crate::assert_with_log!(
commit_events == 1,
"one obligation commit event",
1usize,
commit_events
);
crate::test_complete!("obligation_commit_before_cancel_then_drain");
}
#[test]
fn region_close_blocked_by_pending_obligations() {
init_test("region_close_blocked_by_pending_obligations");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Silent;
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl = state
.create_obligation(ObligationKind::Lease, task, region, None)
.expect("create obligation");
let region_record = state.regions.get_mut(region.arena_index()).expect("region");
region_record.begin_close(None);
region_record.begin_finalize();
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(
!can_close,
"cannot close with pending obligation",
false,
can_close
);
let _ = state.commit_obligation(obl).expect("commit obligation");
if let Some(region_rec) = state.regions.get(region.arena_index()) {
region_rec.remove_task(task);
}
let can_close = state.can_region_complete_close(region);
crate::assert_with_log!(
can_close,
"can close after obligation committed",
true,
can_close
);
crate::test_complete!("region_close_blocked_by_pending_obligations");
}
#[test]
fn cancel_with_obligations_full_trace_lifecycle() {
init_test("cancel_with_obligations_full_trace_lifecycle");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
state.record_task_spawn(task, region);
let _obl = state
.create_obligation(
ObligationKind::SendPermit,
task,
region,
Some("test-permit".to_string()),
)
.expect("create obligation");
let _ = state.cancel_request(region, &CancelReason::deadline(), None);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::deadline()));
let _ = state.task_completed(task);
let events = state.trace.snapshot();
let kinds: Vec<_> = events.iter().map(|e| e.kind).collect();
let has_spawn = kinds.contains(&TraceEventKind::Spawn);
let has_reserve = kinds.contains(&TraceEventKind::ObligationReserve);
let has_cancel = kinds.contains(&TraceEventKind::CancelRequest);
let has_abort = kinds.contains(&TraceEventKind::ObligationAbort);
crate::assert_with_log!(has_spawn, "trace has spawn", true, has_spawn);
crate::assert_with_log!(
has_reserve,
"trace has obligation reserve",
true,
has_reserve
);
crate::assert_with_log!(has_cancel, "trace has cancel request", true, has_cancel);
crate::assert_with_log!(has_abort, "trace has obligation abort", true, has_abort);
let reserve_seq = events
.iter()
.find(|e| e.kind == TraceEventKind::ObligationReserve)
.map(|e| e.seq)
.expect("reserve event");
let cancel_seq = events
.iter()
.find(|e| e.kind == TraceEventKind::CancelRequest)
.map(|e| e.seq)
.expect("cancel event");
let abort_seq = events
.iter()
.find(|e| e.kind == TraceEventKind::ObligationAbort)
.map(|e| e.seq)
.expect("abort event");
crate::assert_with_log!(
reserve_seq < cancel_seq,
"reserve before cancel",
true,
reserve_seq < cancel_seq
);
crate::assert_with_log!(
cancel_seq < abort_seq,
"cancel before abort",
true,
cancel_seq < abort_seq
);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed (removed)",
true,
region_state_removed
);
crate::test_complete!("cancel_with_obligations_full_trace_lifecycle");
}
#[test]
fn mixed_obligation_resolution_during_cancel() {
init_test("mixed_obligation_resolution_during_cancel");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl_committed = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create send");
let obl_aborted = state
.create_obligation(ObligationKind::Ack, task, region, None)
.expect("create ack");
let obl_orphaned = state
.create_obligation(ObligationKind::IoOp, task, region, None)
.expect("create io");
let _ = state.commit_obligation(obl_committed).expect("commit send");
let _ = state
.abort_obligation(obl_aborted, ObligationAbortReason::Explicit)
.expect("abort ack");
crate::assert_with_log!(
state.pending_obligation_count() == 1,
"one obligation still pending",
1usize,
state.pending_obligation_count()
);
let _ = state.cancel_request(region, &CancelReason::shutdown(), None);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::shutdown()));
let _ = state.task_completed(task);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"zero pending obligations",
0usize,
state.pending_obligation_count()
);
let orphaned_record = state
.obligations
.get(obl_orphaned.arena_index())
.expect("orphaned obligation");
crate::assert_with_log!(
!orphaned_record.is_pending(),
"orphaned obligation resolved",
false,
orphaned_record.is_pending()
);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed (removed)",
true,
region_state_removed
);
crate::test_complete!("mixed_obligation_resolution_during_cancel");
}
#[test]
fn region_quiescence_requires_no_live_children_or_tasks() {
init_test("region_quiescence_requires_no_live_children_or_tasks");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let task = insert_task(&mut state, child);
let can_finalize_root = state.can_region_finalize(root);
crate::assert_with_log!(
!can_finalize_root,
"root cannot finalize with open child",
false,
can_finalize_root
);
let can_finalize_child = state.can_region_finalize(child);
crate::assert_with_log!(
!can_finalize_child,
"child cannot finalize with live task",
false,
can_finalize_child
);
let _ = state.cancel_request(root, &CancelReason::user("done"), None);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(task);
let child_state_removed = state.regions.get(child.arena_index()).is_none();
crate::assert_with_log!(
child_state_removed,
"child closed (removed)",
true,
child_state_removed
);
let root_state_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_state_removed,
"root closed (removed)",
true,
root_state_removed
);
crate::test_complete!("region_quiescence_requires_no_live_children_or_tasks");
}
#[test]
fn cancel_prevents_new_obligation_creation() {
init_test("cancel_prevents_new_obligation_creation");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let _ = state.cancel_request(region, &CancelReason::timeout(), None);
let result = state.create_obligation(ObligationKind::SendPermit, task, region, None);
let rejected = result.is_err();
crate::assert_with_log!(
rejected,
"obligation creation rejected in cancelled region",
true,
rejected
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"no obligations created",
0usize,
state.pending_obligation_count()
);
crate::test_complete!("cancel_prevents_new_obligation_creation");
}
#[test]
#[allow(clippy::too_many_lines)]
fn multiple_tasks_obligations_cancel_drain_finalize() {
init_test("multiple_tasks_obligations_cancel_drain_finalize");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task_a = insert_task(&mut state, region);
let task_b = insert_task(&mut state, region);
let obl_a = state
.create_obligation(ObligationKind::SendPermit, task_a, region, None)
.expect("obl_a");
let obl_b1 = state
.create_obligation(ObligationKind::Ack, task_b, region, None)
.expect("obl_b1");
let obl_b2 = state
.create_obligation(ObligationKind::Lease, task_b, region, None)
.expect("obl_b2");
crate::assert_with_log!(
state.pending_obligation_count() == 3,
"three pending",
3usize,
state.pending_obligation_count()
);
let _ = state.cancel_request(region, &CancelReason::deadline(), None);
let _ = state.commit_obligation(obl_a).expect("commit obl_a");
state
.task_mut(task_a)
.expect("task_a")
.complete(Outcome::Cancelled(CancelReason::deadline()));
let _ = state.task_completed(task_a);
let region_state = state
.regions
.get(region.arena_index())
.expect("region")
.state();
crate::assert_with_log!(
region_state == crate::record::region::RegionState::Closing,
"region still closing",
crate::record::region::RegionState::Closing,
region_state
);
crate::assert_with_log!(
state.pending_obligation_count() == 2,
"two pending (task_b's)",
2usize,
state.pending_obligation_count()
);
state
.task_mut(task_b)
.expect("task_b")
.complete(Outcome::Cancelled(CancelReason::deadline()));
let _ = state.task_completed(task_b);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"all obligations resolved",
0usize,
state.pending_obligation_count()
);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed (removed)",
true,
region_state_removed
);
let events = state.trace.snapshot();
let reserve_count = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationReserve)
.count();
let commit_count = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationCommit)
.count();
let abort_count = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationAbort)
.count();
crate::assert_with_log!(
reserve_count == 3,
"three reserve events",
3usize,
reserve_count
);
crate::assert_with_log!(
commit_count == 1,
"one commit event (obl_a)",
1usize,
commit_count
);
crate::assert_with_log!(
abort_count == 2,
"two abort events (obl_b1 + obl_b2)",
2usize,
abort_count
);
let _ = obl_b1;
let _ = obl_b2;
crate::test_complete!("multiple_tasks_obligations_cancel_drain_finalize");
}
#[cfg(target_os = "linux")]
mod epoll_integration {
use super::*;
use crate::runtime::reactor::{EpollReactor, Interest};
use std::io::Write;
use std::os::unix::net::UnixStream;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::Waker;
use std::time::Duration;
struct FlagWaker(AtomicBool);
impl Wake for FlagWaker {
fn wake(self: Arc<Self>) {
self.0.store(true, Ordering::SeqCst);
}
}
#[test]
fn runtime_state_with_epoll_reactor() {
super::init_test("runtime_state_with_epoll_reactor");
let reactor = Arc::new(EpollReactor::new().expect("create reactor"));
let state = RuntimeState::with_reactor(reactor);
crate::assert_with_log!(
state.has_io_driver(),
"has io driver",
true,
state.has_io_driver()
);
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "quiescent", true, quiescent);
let (sock_read, mut sock_write) = UnixStream::pair().expect("socket pair");
let waker_state = Arc::new(FlagWaker(AtomicBool::new(false)));
let waker = Waker::from(waker_state.clone());
let registration = {
let mut driver = state.io_driver_mut().unwrap();
driver
.register(&sock_read, Interest::READABLE, waker)
.expect("register")
};
let quiescent = state.is_quiescent();
crate::assert_with_log!(!quiescent, "not quiescent", false, quiescent);
sock_write.write_all(b"hello").expect("write");
let count = {
let mut driver = state.io_driver_mut().unwrap();
driver.turn(Some(Duration::from_millis(100))).expect("turn")
};
crate::assert_with_log!(count >= 1, "event count", true, count >= 1);
let flag = waker_state.0.load(Ordering::SeqCst);
crate::assert_with_log!(flag, "waker fired", true, flag);
{
let mut driver = state.io_driver_mut().unwrap();
driver.deregister(registration).expect("deregister");
}
let quiescent = state.is_quiescent();
crate::assert_with_log!(quiescent, "quiescent", true, quiescent);
crate::test_complete!("runtime_state_with_epoll_reactor");
}
}
fn setup_leakable_obligation(
response: ObligationLeakResponse,
) -> (RuntimeState, RegionId, TaskId, ObligationId) {
let mut state = RuntimeState::new();
state.set_obligation_leak_response(response);
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
(state, region, task, obl)
}
fn complete_task_ok(state: &mut RuntimeState, task: TaskId) {
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task);
}
#[test]
fn leak_response_silent_marks_leaked_no_log() {
init_test("leak_response_silent_marks_leaked_no_log");
let (mut state, _region, task, obl) =
setup_leakable_obligation(ObligationLeakResponse::Silent);
complete_task_ok(&mut state, task);
let record = state.obligations.get(obl.arena_index()).expect("obl");
crate::assert_with_log!(
record.state == ObligationState::Leaked,
"obligation leaked",
ObligationState::Leaked,
record.state
);
crate::assert_with_log!(
state.leak_count() == 1,
"leak count incremented",
1u64,
state.leak_count()
);
crate::test_complete!("leak_response_silent_marks_leaked_no_log");
}
#[test]
fn leak_response_log_marks_leaked() {
init_test("leak_response_log_marks_leaked");
let (mut state, _region, task, obl) =
setup_leakable_obligation(ObligationLeakResponse::Log);
complete_task_ok(&mut state, task);
let record = state.obligations.get(obl.arena_index()).expect("obl");
crate::assert_with_log!(
record.state == ObligationState::Leaked,
"obligation leaked via Log mode",
ObligationState::Leaked,
record.state
);
let events = state.trace.snapshot();
let leak_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationLeak)
.count();
crate::assert_with_log!(
leak_events == 1,
"one leak trace event",
1usize,
leak_events
);
crate::assert_with_log!(
state.leak_count() == 1,
"leak count",
1u64,
state.leak_count()
);
crate::test_complete!("leak_response_log_marks_leaked");
}
#[test]
fn leak_response_recover_aborts_instead_of_leaking() {
init_test("leak_response_recover_aborts_instead_of_leaking");
let (mut state, _region, task, obl) =
setup_leakable_obligation(ObligationLeakResponse::Recover);
complete_task_ok(&mut state, task);
let record = state.obligations.get(obl.arena_index()).expect("obl");
crate::assert_with_log!(
record.state == ObligationState::Aborted,
"obligation aborted by recovery",
ObligationState::Aborted,
record.state
);
let events = state.trace.snapshot();
let abort_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationAbort)
.count();
let leak_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationLeak)
.count();
crate::assert_with_log!(
abort_events >= 1,
"abort trace event from recovery",
true,
abort_events >= 1
);
crate::assert_with_log!(
leak_events == 0,
"no leak trace event in recover mode",
0usize,
leak_events
);
crate::assert_with_log!(
state.leak_count() == 1,
"leak count still incremented",
1u64,
state.leak_count()
);
crate::test_complete!("leak_response_recover_aborts_instead_of_leaking");
}
#[test]
#[should_panic(expected = "obligation leak")]
fn leak_response_panic_panics() {
init_test("leak_response_panic_panics");
let (mut state, _region, task, _obl) =
setup_leakable_obligation(ObligationLeakResponse::Panic);
complete_task_ok(&mut state, task);
}
#[test]
fn leak_escalation_from_log_to_panic() {
init_test("leak_escalation_from_log_to_panic");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Log);
state.set_leak_escalation(Some(LeakEscalation::new(3, ObligationLeakResponse::Panic)));
let region = state.create_root_region(Budget::INFINITE);
for i in 0u64..2 {
let task = insert_task(&mut state, region);
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
complete_task_ok(&mut state, task);
let expected = i + 1;
crate::assert_with_log!(
state.leak_count() == expected,
&format!("leak count after batch {expected}"),
expected,
state.leak_count()
);
}
let task = insert_task(&mut state, region);
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
complete_task_ok(&mut state, task);
}));
crate::assert_with_log!(
result.is_err(),
"escalated to panic at threshold",
true,
result.is_err()
);
crate::test_complete!("leak_escalation_from_log_to_panic");
}
#[test]
fn leak_escalation_from_silent_to_recover() {
init_test("leak_escalation_from_silent_to_recover");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
state.set_leak_escalation(Some(LeakEscalation::new(
2,
ObligationLeakResponse::Recover,
)));
let region = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, region);
let obl1 = state
.create_obligation(ObligationKind::Ack, task1, region, None)
.expect("create");
complete_task_ok(&mut state, task1);
let record1 = state.obligations.get(obl1.arena_index()).expect("obl1");
crate::assert_with_log!(
record1.state == ObligationState::Leaked,
"first leak: Leaked state (silent)",
ObligationState::Leaked,
record1.state
);
let task2 = insert_task(&mut state, region);
let obl2 = state
.create_obligation(ObligationKind::Lease, task2, region, None)
.expect("create");
complete_task_ok(&mut state, task2);
let record2 = state.obligations.get(obl2.arena_index()).expect("obl2");
crate::assert_with_log!(
record2.state == ObligationState::Aborted,
"second leak: Aborted (recovered)",
ObligationState::Aborted,
record2.state
);
crate::assert_with_log!(
state.leak_count() == 2,
"total leak count",
2u64,
state.leak_count()
);
crate::test_complete!("leak_escalation_from_silent_to_recover");
}
#[test]
fn leak_count_accumulates_across_tasks() {
init_test("leak_count_accumulates_across_tasks");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
let region = state.create_root_region(Budget::INFINITE);
for _ in 0..5 {
let task = insert_task(&mut state, region);
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create");
state
.create_obligation(ObligationKind::IoOp, task, region, None)
.expect("create");
complete_task_ok(&mut state, task);
}
crate::assert_with_log!(
state.leak_count() == 10,
"10 cumulative leaks",
10u64,
state.leak_count()
);
crate::test_complete!("leak_count_accumulates_across_tasks");
}
#[test]
fn no_escalation_when_not_configured() {
init_test("no_escalation_when_not_configured");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
let region = state.create_root_region(Budget::INFINITE);
for _ in 0..100 {
let task = insert_task(&mut state, region);
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create");
complete_task_ok(&mut state, task);
}
crate::assert_with_log!(
state.leak_count() == 100,
"100 leaks, no panic",
100u64,
state.leak_count()
);
crate::test_complete!("no_escalation_when_not_configured");
}
#[test]
#[allow(clippy::too_many_lines)]
fn three_level_cascade_with_obligations() {
init_test("three_level_cascade_with_obligations");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Silent;
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let grandchild = create_child_region(&mut state, child);
let root_task = insert_task(&mut state, root);
let child_task = insert_task(&mut state, child);
let gc_task = insert_task(&mut state, grandchild);
let _root_obl = state
.create_obligation(ObligationKind::SendPermit, root_task, root, None)
.expect("root obl");
let child_obl = state
.create_obligation(ObligationKind::Ack, child_task, child, None)
.expect("child obl");
let _gc_obl = state
.create_obligation(ObligationKind::IoOp, gc_task, grandchild, None)
.expect("gc obl");
crate::assert_with_log!(
state.pending_obligation_count() == 3,
"three pending obligations across tree",
3usize,
state.pending_obligation_count()
);
let tasks_to_schedule = state.cancel_request(root, &CancelReason::user("shutdown"), None);
crate::assert_with_log!(
tasks_to_schedule.len() == 3,
"all three tasks scheduled for cancel",
3usize,
tasks_to_schedule.len()
);
state
.task_mut(gc_task)
.expect("gc_task")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(gc_task);
let gc_state_removed = state.regions.get(grandchild.arena_index()).is_none();
crate::assert_with_log!(
gc_state_removed,
"grandchild closed (removed)",
true,
gc_state_removed
);
let child_state_now = state
.regions
.get(child.arena_index())
.expect("child")
.state();
let child_still_active = !matches!(child_state_now, RegionState::Closed);
crate::assert_with_log!(
child_still_active,
"child not yet closed",
true,
child_still_active
);
let _ = state
.commit_obligation(child_obl)
.expect("commit child obl");
state
.task_mut(child_task)
.expect("child_task")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(child_task);
let child_state_final_removed = state.regions.get(child.arena_index()).is_none();
crate::assert_with_log!(
child_state_final_removed,
"child closed after task + obligation resolved (removed)",
true,
child_state_final_removed
);
let root_state_mid = state.regions.get(root.arena_index()).expect("root").state();
let root_not_closed = !matches!(root_state_mid, RegionState::Closed);
crate::assert_with_log!(
root_not_closed,
"root not yet closed",
true,
root_not_closed
);
state
.task_mut(root_task)
.expect("root_task")
.complete(Outcome::Cancelled(CancelReason::user("shutdown")));
let _ = state.task_completed(root_task);
let root_state_final_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_state_final_removed,
"root closed after full cascade (removed)",
true,
root_state_final_removed
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"zero pending obligations after cascade",
0usize,
state.pending_obligation_count()
);
let events = state.trace.snapshot();
let cancel_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::CancelRequest)
.count();
crate::assert_with_log!(
cancel_events >= 1,
"cancel trace events emitted",
true,
cancel_events >= 1
);
let abort_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationAbort)
.count();
crate::assert_with_log!(
abort_events >= 2,
"at least two obligation aborts (gc + root)",
true,
abort_events >= 2
);
crate::test_complete!("three_level_cascade_with_obligations");
}
#[test]
fn obligation_resolve_advances_draining_region() {
init_test("obligation_resolve_advances_draining_region");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let obl1 = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("obl1");
let obl2 = state
.create_obligation(ObligationKind::Ack, task, region, None)
.expect("obl2");
let _ = state.cancel_request(region, &CancelReason::timeout(), None);
let _ = state.commit_obligation(obl1).expect("commit obl1");
let _ = state
.abort_obligation(obl2, ObligationAbortReason::Cancel)
.expect("abort obl2");
state
.task_mut(task)
.expect("task")
.complete(Outcome::Cancelled(CancelReason::timeout()));
let _ = state.task_completed(task);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed after obligation resolve + task complete (removed)",
true,
region_state_removed
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"zero pending",
0usize,
state.pending_obligation_count()
);
crate::test_complete!("obligation_resolve_advances_draining_region");
}
#[test]
fn shardguard_locking_patterns_exercised() {
use crate::runtime::ShardGuard;
use crate::runtime::ShardedState;
use crate::runtime::sharded_state::ShardedConfig;
init_test("shardguard_locking_patterns_exercised");
let trace = TraceBufferHandle::new(1024);
let metrics: Arc<dyn MetricsProvider> = Arc::new(TestMetrics::default());
let config = ShardedConfig {
io_driver: None,
timer_driver: None,
logical_clock_mode: LogicalClockMode::Lamport,
cancel_attribution: CancelAttributionConfig::default(),
entropy_source: Arc::new(OsEntropy),
blocking_pool: None,
obligation_leak_response: ObligationLeakResponse::Log,
leak_escalation: None,
observability: None,
};
let shards = ShardedState::new(trace, metrics, config);
{
let guard = ShardGuard::for_spawn(&shards);
let has_regions = guard.regions.is_some();
let has_tasks = guard.tasks.is_some();
let no_obligations = guard.obligations.is_none();
drop(guard);
crate::assert_with_log!(
has_regions && has_tasks && no_obligations,
"for_spawn: B+A only",
true,
has_regions && has_tasks && no_obligations
);
}
{
let guard = ShardGuard::for_obligation(&shards);
let has_regions = guard.regions.is_some();
let no_tasks = guard.tasks.is_none();
let has_obligations = guard.obligations.is_some();
drop(guard);
crate::assert_with_log!(
has_regions && no_tasks && has_obligations,
"for_obligation: B+C only",
true,
has_regions && no_tasks && has_obligations
);
}
{
let guard = ShardGuard::for_task_completed(&shards);
let all_present =
guard.regions.is_some() && guard.tasks.is_some() && guard.obligations.is_some();
drop(guard);
crate::assert_with_log!(all_present, "for_task_completed: B+A+C", true, all_present);
}
{
let guard = ShardGuard::for_cancel(&shards);
let all_present =
guard.regions.is_some() && guard.tasks.is_some() && guard.obligations.is_some();
drop(guard);
crate::assert_with_log!(all_present, "for_cancel: B+A+C", true, all_present);
}
{
let guard = ShardGuard::for_obligation_resolve(&shards);
let all_present =
guard.regions.is_some() && guard.tasks.is_some() && guard.obligations.is_some();
drop(guard);
crate::assert_with_log!(
all_present,
"for_obligation_resolve: B+A+C",
true,
all_present
);
}
crate::test_complete!("shardguard_locking_patterns_exercised");
}
#[test]
fn task_completed_ok_with_leaked_obligations_closes_region() {
init_test("task_completed_ok_with_leaked_obligations_closes_region");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Silent;
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let _obl1 = state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("obl1");
let _obl2 = state
.create_obligation(ObligationKind::Ack, task, region, None)
.expect("obl2");
{
let region_record = state.regions.get(region.arena_index()).expect("region");
region_record.begin_close(None);
}
crate::assert_with_log!(
state.pending_obligation_count() == 2,
"two pending obligations",
2usize,
state.pending_obligation_count()
);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed despite leaked obligations (Silent mode) (removed)",
true,
region_state_removed
);
let events = state.trace.snapshot();
let leak_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationLeak)
.count();
crate::assert_with_log!(
leak_events == 2,
"two obligation leak trace events",
2usize,
leak_events
);
crate::test_complete!("task_completed_ok_with_leaked_obligations_closes_region");
}
#[test]
fn finalizing_leak_detection_waits_for_task_cleanup() {
init_test("finalizing_leak_detection_waits_for_task_cleanup");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
{
let region_record = state.regions.get(region.arena_index()).expect("region");
region_record.begin_close(None);
region_record.begin_finalize();
}
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
state.advance_region_state(region);
crate::assert_with_log!(
state.pending_obligation_count() == 1,
"pending obligation preserved until task cleanup",
1usize,
state.pending_obligation_count()
);
crate::assert_with_log!(
state.leak_count() == 0,
"no leaks emitted before task cleanup",
0u64,
state.leak_count()
);
let early_leak_events = state
.trace
.snapshot()
.into_iter()
.filter(|event| event.kind == TraceEventKind::ObligationLeak)
.count();
crate::assert_with_log!(
early_leak_events == 0,
"no leak trace events before task cleanup",
0usize,
early_leak_events
);
let _ = state.task_completed(task);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"task_completed resolves leaked obligation",
0usize,
state.pending_obligation_count()
);
crate::assert_with_log!(
state.leak_count() == 1,
"exactly one leak emitted after task cleanup",
1u64,
state.leak_count()
);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closes after task cleanup handles leak",
true,
region_state_removed
);
crate::test_complete!("finalizing_leak_detection_waits_for_task_cleanup");
}
#[test]
fn cancel_sibling_tasks_preserves_triggering_child() {
init_test("cancel_sibling_tasks_preserves_triggering_child");
let mut state = RuntimeState::new();
let region = state.create_root_region(Budget::INFINITE);
let task_a = insert_task(&mut state, region);
let task_b = insert_task(&mut state, region);
let task_c = insert_task(&mut state, region);
let task_d = insert_task(&mut state, region);
let reason = CancelReason::fail_fast().with_message("sibling failed");
let to_cancel = state.cancel_sibling_tasks(region, task_b, &reason);
let cancelled_ids: Vec<TaskId> = to_cancel.iter().map(|(id, _)| *id).collect();
crate::assert_with_log!(
!cancelled_ids.contains(&task_b),
"triggering child not cancelled",
false,
cancelled_ids.contains(&task_b)
);
crate::assert_with_log!(
cancelled_ids.len() == 3,
"three siblings cancelled",
3usize,
cancelled_ids.len()
);
for &expected in &[task_a, task_c, task_d] {
crate::assert_with_log!(
cancelled_ids.contains(&expected),
"sibling in cancel list",
true,
cancelled_ids.contains(&expected)
);
}
let b_record = state.task(task_b).expect("task_b");
crate::assert_with_log!(
matches!(b_record.state, TaskState::Created),
"triggering child state unchanged",
true,
matches!(b_record.state, TaskState::Created)
);
for &sib in &[task_a, task_c, task_d] {
let record = state.task(sib).expect("sibling");
let is_cancel_requested = record.state.is_cancelling();
crate::assert_with_log!(
is_cancel_requested,
"sibling is cancelling",
true,
is_cancel_requested
);
}
crate::test_complete!("cancel_sibling_tasks_preserves_triggering_child");
}
#[test]
fn bottom_up_cascade_without_cancel() {
init_test("bottom_up_cascade_without_cancel");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let grandchild = create_child_region(&mut state, child);
let gc_task = insert_task(&mut state, grandchild);
let child_task = insert_task(&mut state, child);
let root_task = insert_task(&mut state, root);
{
let region = state.regions.get(root.arena_index()).expect("root");
region.begin_close(None);
}
{
let region = state.regions.get(child.arena_index()).expect("child");
region.begin_close(None);
}
{
let region = state
.regions
.get(grandchild.arena_index())
.expect("grandchild");
region.begin_close(None);
}
state
.task_mut(gc_task)
.expect("gc_task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(gc_task);
let gc_state_removed = state.regions.get(grandchild.arena_index()).is_none();
crate::assert_with_log!(
gc_state_removed,
"grandchild closed after task done (removed)",
true,
gc_state_removed
);
let child_state = state
.regions
.get(child.arena_index())
.expect("child")
.state();
let child_not_closed = !matches!(child_state, RegionState::Closed);
crate::assert_with_log!(
child_not_closed,
"child not closed (task alive)",
true,
child_not_closed
);
state
.task_mut(child_task)
.expect("child_task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(child_task);
let child_state_final_removed = state.regions.get(child.arena_index()).is_none();
crate::assert_with_log!(
child_state_final_removed,
"child closed after task done + grandchild closed (removed)",
true,
child_state_final_removed
);
let root_state = state.regions.get(root.arena_index()).expect("root").state();
let root_not_closed = !matches!(root_state, RegionState::Closed);
crate::assert_with_log!(
root_not_closed,
"root not closed (task alive)",
true,
root_not_closed
);
state
.task_mut(root_task)
.expect("root_task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(root_task);
let root_state_final_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_state_final_removed,
"root closed after full cascade (removed)",
true,
root_state_final_removed
);
crate::test_complete!("bottom_up_cascade_without_cancel");
}
#[test]
fn obligation_leak_recover_mode_allows_region_close() {
init_test("obligation_leak_recover_mode_allows_region_close");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Recover;
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
let _obl1 = state
.create_obligation(ObligationKind::Lease, task, region, None)
.expect("lease");
let _obl2 = state
.create_obligation(ObligationKind::IoOp, task, region, None)
.expect("io_op");
{
let region_record = state.regions.get(region.arena_index()).expect("region");
region_record.begin_close(None);
}
crate::assert_with_log!(
state.pending_obligation_count() == 2,
"two pending obligations",
2usize,
state.pending_obligation_count()
);
state
.task_mut(task)
.expect("task")
.complete(Outcome::Err(Error::new(ErrorKind::Internal)));
let _ = state.task_completed(task);
let region_state_removed = state.regions.get(region.arena_index()).is_none();
crate::assert_with_log!(
region_state_removed,
"region closed in Recover mode (removed)",
true,
region_state_removed
);
let events = state.trace.snapshot();
let abort_events = events
.iter()
.filter(|e| e.kind == TraceEventKind::ObligationAbort)
.count();
crate::assert_with_log!(
abort_events == 2,
"two obligation aborts in recover mode",
2usize,
abort_events
);
crate::test_complete!("obligation_leak_recover_mode_allows_region_close");
}
#[test]
fn mixed_obligation_resolution_during_cancel_cascade() {
init_test("mixed_obligation_resolution_during_cancel_cascade");
let mut state = RuntimeState::new();
state.obligation_leak_response = ObligationLeakResponse::Silent;
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let root_task = insert_task(&mut state, root);
let child_task1 = insert_task(&mut state, child);
let child_task2 = insert_task(&mut state, child);
let root_obl = state
.create_obligation(ObligationKind::SendPermit, root_task, root, None)
.expect("root obl");
let child_obl1 = state
.create_obligation(ObligationKind::Ack, child_task1, child, None)
.expect("child obl1");
let _child_obl2 = state
.create_obligation(ObligationKind::IoOp, child_task2, child, None)
.expect("child obl2");
let _ = state.commit_obligation(root_obl).expect("commit root obl");
let _ = state.cancel_request(root, &CancelReason::user("test"), None);
let _ = state
.abort_obligation(child_obl1, ObligationAbortReason::Cancel)
.expect("abort child obl1");
state
.task_mut(child_task1)
.expect("child_task1")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(child_task1);
state
.task_mut(child_task2)
.expect("child_task2")
.complete(Outcome::Cancelled(CancelReason::parent_cancelled()));
let _ = state.task_completed(child_task2);
let child_state_removed = state.regions.get(child.arena_index()).is_none();
crate::assert_with_log!(
child_state_removed,
"child closed (removed)",
true,
child_state_removed
);
state
.task_mut(root_task)
.expect("root_task")
.complete(Outcome::Cancelled(CancelReason::user("test")));
let _ = state.task_completed(root_task);
let root_state_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_state_removed,
"root closed after mixed resolution (removed)",
true,
root_state_removed
);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"zero pending",
0usize,
state.pending_obligation_count()
);
crate::test_complete!("mixed_obligation_resolution_during_cancel_cascade");
}
#[derive(Default)]
struct RegionCloseMetrics {
closed: Mutex<Vec<(RegionId, Duration)>>,
}
impl MetricsProvider for RegionCloseMetrics {
fn task_spawned(&self, _: RegionId, _: TaskId) {}
fn task_completed(&self, _: TaskId, _: OutcomeKind, _: Duration) {}
fn region_created(&self, _: RegionId, _: Option<RegionId>) {}
fn region_closed(&self, id: RegionId, lifetime: Duration) {
self.closed.lock().push((id, lifetime));
}
fn cancellation_requested(&self, _: RegionId, _: CancelKind) {}
fn drain_completed(&self, _: RegionId, _: Duration) {}
fn deadline_set(&self, _: RegionId, _: Duration) {}
fn deadline_exceeded(&self, _: RegionId) {}
fn deadline_warning(&self, _: &str, _: &'static str, _: Duration) {}
fn deadline_violation(&self, _: &str, _: Duration) {}
fn deadline_remaining(&self, _: &str, _: Duration) {}
fn checkpoint_interval(&self, _: &str, _: Duration) {}
fn task_stuck_detected(&self, _: &str) {}
fn obligation_created(&self, _: RegionId) {}
fn obligation_discharged(&self, _: RegionId) {}
fn obligation_leaked(&self, _: RegionId) {}
fn scheduler_tick(&self, _: usize, _: Duration) {}
}
#[test]
#[allow(clippy::significant_drop_tightening)]
fn region_closed_metric_fires_on_close() {
init_test("region_closed_metric_fires_on_close");
let metrics = Arc::new(RegionCloseMetrics::default());
let mut state = RuntimeState::new_with_metrics(metrics.clone());
let root = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, root);
{
let region = state.regions.get(root.arena_index()).expect("root");
region.begin_close(None);
}
state
.task_mut(task)
.expect("task")
.complete(Outcome::Ok(()));
let _ = state.task_completed(task);
{
let closed = metrics.closed.lock();
crate::assert_with_log!(
closed.len() == 1,
"region_closed metric fired exactly once",
1usize,
closed.len()
);
crate::assert_with_log!(
closed[0].0 == root,
"correct region ID in metric",
root,
closed[0].0
);
}
crate::test_complete!("region_closed_metric_fires_on_close");
}
#[test]
fn leak_count_exact_for_multiple_obligations() {
init_test("leak_count_exact_for_multiple_obligations");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
let region = state.create_root_region(Budget::INFINITE);
let task = insert_task(&mut state, region);
for _ in 0..5 {
state
.create_obligation(ObligationKind::SendPermit, task, region, None)
.expect("create obligation");
}
complete_task_ok(&mut state, task);
crate::assert_with_log!(
state.leak_count() == 5,
"leak_count is exactly N, not inflated by reentrance",
5u64,
state.leak_count()
);
crate::test_complete!("leak_count_exact_for_multiple_obligations");
}
#[test]
fn nested_parent_leaks_are_not_suppressed_by_child_leak_handling() {
init_test("nested_parent_leaks_are_not_suppressed_by_child_leak_handling");
let mut state = RuntimeState::new();
state.set_obligation_leak_response(ObligationLeakResponse::Silent);
let root = state.create_root_region(Budget::INFINITE);
let child = create_child_region(&mut state, root);
let root_task = insert_task(&mut state, root);
let child_task = insert_task(&mut state, child);
state
.create_obligation(ObligationKind::Lease, root_task, root, None)
.expect("root obligation");
state
.create_obligation(ObligationKind::Ack, child_task, child, None)
.expect("child obligation");
state
.regions
.get(root.arena_index())
.expect("root missing")
.begin_close(None);
state
.regions
.get(child.arena_index())
.expect("child missing")
.begin_close(None);
let _ = state.remove_task(root_task);
state
.regions
.get(root.arena_index())
.expect("root missing")
.remove_task(root_task);
state
.task_mut(child_task)
.expect("child task missing")
.complete(Outcome::Ok(()));
let _ = state.task_completed(child_task);
crate::assert_with_log!(
state.pending_obligation_count() == 0,
"all nested leaks resolved",
0usize,
state.pending_obligation_count()
);
crate::assert_with_log!(
state.leak_count() == 2,
"both child and parent leaks counted exactly once",
2u64,
state.leak_count()
);
let leak_events = state
.trace
.snapshot()
.into_iter()
.filter(|event| event.kind == TraceEventKind::ObligationLeak)
.count();
crate::assert_with_log!(
leak_events == 2,
"trace records both nested leaks",
2usize,
leak_events
);
let root_removed = state.regions.get(root.arena_index()).is_none();
crate::assert_with_log!(
root_removed,
"parent region closes after nested leak handling",
true,
root_removed
);
crate::test_complete!("nested_parent_leaks_are_not_suppressed_by_child_leak_handling");
}
#[test]
fn budget_snapshot_debug_clone_copy() {
let s = BudgetSnapshot {
deadline: Some(1_000_000),
poll_quota: 128,
cost_quota: None,
priority: 5,
};
let dbg = format!("{s:?}");
assert!(dbg.contains("BudgetSnapshot"), "{dbg}");
let copied = s;
let cloned = s;
assert_eq!(copied.priority, cloned.priority);
}
#[test]
fn cancel_kind_snapshot_debug_clone() {
let k = CancelKindSnapshot::User;
let dbg = format!("{k:?}");
assert!(dbg.contains("User"), "{dbg}");
let cloned = k;
let dbg2 = format!("{cloned:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn region_state_snapshot_debug_clone() {
let s = RegionStateSnapshot::Open;
let dbg = format!("{s:?}");
assert!(dbg.contains("Open"), "{dbg}");
let cloned = s;
let dbg2 = format!("{cloned:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn event_data_snapshot_preserves_worker_replay_linkage() {
let task = TaskId::from_arena(ArenaIndex::new(1, 2));
let region = RegionId::from_arena(ArenaIndex::new(3, 4));
let obligation = ObligationId::from_arena(ArenaIndex::new(5, 6));
let snapshot = EventDataSnapshot::from_trace_data(&TraceData::Worker {
worker_id: "worker-a".to_string(),
job_id: 77,
decision_seq: 91,
replay_hash: 0x00C0_FFEE,
task,
region,
obligation,
});
match snapshot {
EventDataSnapshot::Worker {
worker_id,
job_id,
decision_seq,
replay_hash,
task: task_snapshot,
region: region_snapshot,
obligation: obligation_snapshot,
} => {
assert_eq!(worker_id, "worker-a");
assert_eq!(job_id, 77);
assert_eq!(decision_seq, 91);
assert_eq!(replay_hash, 0x00C0_FFEE);
assert_eq!(task_snapshot, IdSnapshot::from(task));
assert_eq!(region_snapshot, IdSnapshot::from(region));
assert_eq!(obligation_snapshot, IdSnapshot::from(obligation));
}
other => panic!("expected worker snapshot, got {other:?}"), }
}
mod metamorphic_region_close_tests {
use super::*;
use crate::lab::config::LabConfig;
use crate::lab::runtime::LabRuntime;
use proptest::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Debug, Clone, PartialEq)]
struct RegionCloseOutcome {
region_id: RegionId,
close_successful: bool,
final_state: crate::record::region::RegionState,
task_count: usize,
child_count: usize,
pending_obligations: usize,
cancel_reason: Option<CancelReason>,
}
impl RegionCloseOutcome {
fn from_region(runtime: &LabRuntime, region_id: RegionId) -> Option<Self> {
let state = &runtime.state;
let regions = &state.regions;
regions.get(region_id.arena_index()).map(|region| Self {
region_id,
close_successful: region.state() == crate::record::region::RegionState::Closed,
final_state: region.state(),
task_count: region.task_count(),
child_count: region.child_count(),
pending_obligations: region.pending_obligations(),
cancel_reason: region.cancel_reason(),
})
}
}
fn cancel_reason_for_variant(variant: u8) -> CancelReason {
match variant {
0 => CancelReason::default(),
1 => CancelReason::timeout(),
2 => CancelReason::resource_unavailable(),
_ => CancelReason::user("redundant close"),
}
}
#[test]
fn mr_region_close_idempotency() {
proptest!(|(seed in any::<u64>())| {
let config = LabConfig::new(seed).max_steps(1000);
let mut runtime = LabRuntime::new(config);
let region_id = runtime.state.create_root_region(Budget::default());
let _first_close_result = {
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
let begin_result = region.begin_close(None);
if begin_result {
let _ = region.begin_finalize();
region.complete_close()
} else {
false
}
};
let first_outcome = RegionCloseOutcome::from_region(&runtime, region_id);
let second_close_result = {
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
region.complete_close() };
let second_outcome = RegionCloseOutcome::from_region(&runtime, region_id);
prop_assert_eq!(second_close_result, false, "Second close should return false (idempotent)");
prop_assert_eq!(&first_outcome, &second_outcome, "Region state should be unchanged by second close");
if let Some(outcome) = first_outcome {
prop_assert_eq!(outcome.final_state, crate::record::region::RegionState::Closed);
}
});
}
#[test]
fn mr_cancel_closed_region_noop() {
proptest!(|(seed in any::<u64>(), cancel_reason_variant in 0..3u8)| {
let config = LabConfig::new(seed).max_steps(1000);
let mut runtime = LabRuntime::new(config);
let region_id = runtime.state.create_root_region(Budget::default());
{
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
let _ = region.begin_close(None);
let _ = region.begin_finalize();
let _ = region.complete_close();
}
let outcome_before_cancel = RegionCloseOutcome::from_region(&runtime, region_id);
let cancel_reason = cancel_reason_for_variant(cancel_reason_variant);
{
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
let _ = region.begin_close(Some(cancel_reason)); }
let outcome_after_cancel = RegionCloseOutcome::from_region(&runtime, region_id);
prop_assert_eq!(outcome_before_cancel, outcome_after_cancel,
"Cancelling closed region should have no effect");
});
}
#[test]
fn mr_closed_region_preserves_terminal_cancel_reason() {
proptest!(|(seed in any::<u64>(), initial_variant in 0..4u8, followup_variant in 0..4u8)| {
let config = LabConfig::new(seed).max_steps(1000);
let mut runtime = LabRuntime::new(config);
let region_id = runtime.state.create_root_region(Budget::default());
let initial_reason = cancel_reason_for_variant(initial_variant);
let followup_reason = cancel_reason_for_variant(followup_variant);
{
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
let _ = region.begin_close(Some(initial_reason.clone()));
let _ = region.begin_finalize();
let _ = region.complete_close();
}
let outcome_before_redundant_close = RegionCloseOutcome::from_region(&runtime, region_id);
{
let state = &mut runtime.state;
let region = state.regions.get_mut(region_id.arena_index()).unwrap();
let redundant_close_result = region.begin_close(Some(followup_reason));
prop_assert!(!redundant_close_result, "redundant close on terminal region should be a no-op");
}
let outcome_after_redundant_close = RegionCloseOutcome::from_region(&runtime, region_id);
prop_assert_eq!(
&outcome_before_redundant_close,
&outcome_after_redundant_close,
"redundant close must preserve the terminal cancel reason and region snapshot"
);
if let Some(outcome) = &outcome_after_redundant_close {
prop_assert_eq!(
outcome.cancel_reason.clone(),
Some(initial_reason),
"terminal cancel reason should remain the original close cause"
);
}
});
}
#[test]
fn mr_child_close_containment() {
proptest!(|(seed in any::<u64>(), num_children in 1..5usize)| {
let config = LabConfig::new(seed).max_steps(2000);
let mut runtime = LabRuntime::new(config);
let parent_id = runtime.state.create_root_region(Budget::default());
let mut child_ids = Vec::new();
for _ in 0..num_children {
let child_id = runtime.state.create_child_region(parent_id, Budget::default()).unwrap();
child_ids.push(child_id);
}
for &child_id in &child_ids {
let state = &mut runtime.state;
let child = state.regions.get_mut(child_id.arena_index()).unwrap();
let _ = child.begin_close(None);
let _ = child.begin_finalize();
let _ = child.complete_close();
}
let parent_outcome_before = RegionCloseOutcome::from_region(&runtime, parent_id);
prop_assert!(parent_outcome_before.is_some());
if let Some(outcome) = parent_outcome_before {
prop_assert_ne!(outcome.final_state, crate::record::region::RegionState::Closed,
"Parent should not auto-close when children close");
}
{
let state = &mut runtime.state;
let parent = state.regions.get_mut(parent_id.arena_index()).unwrap();
let _ = parent.begin_close(None);
let _ = parent.begin_finalize();
let _ = parent.complete_close();
}
let parent_outcome_after = RegionCloseOutcome::from_region(&runtime, parent_id);
prop_assert!(parent_outcome_after.is_some());
if let Some(outcome) = parent_outcome_after {
prop_assert_eq!(outcome.final_state, crate::record::region::RegionState::Closed);
prop_assert_eq!(outcome.child_count, 0, "Closed parent should have no children");
}
});
}
#[test]
fn mr_no_orphan_invariant() {
proptest!(|(seed in any::<u64>(), num_operations in 5..20usize)| {
let config = LabConfig::new(seed).max_steps(3000).worker_count(2);
let mut runtime = LabRuntime::new(config);
let region_id = runtime.state.create_root_region(Budget::default());
let spawned_tasks = Arc::new(AtomicUsize::new(0));
let completed_tasks = Arc::new(AtomicUsize::new(0));
let close_attempted = Arc::new(AtomicBool::new(false));
for i in 0..num_operations {
let spawned_count = spawned_tasks.clone();
let completed_count = completed_tasks.clone();
let close_flag = close_attempted.clone();
if i % 3 == 0 && !close_flag.load(Ordering::Relaxed) {
close_flag.store(true, Ordering::Relaxed);
let _close_task = futures_lite::future::block_on(async move {
Ok::<(), ()>(())
});
} else {
let task_spawned = spawned_count.clone();
let task_completed = completed_count.clone();
let task_result = futures_lite::future::block_on(async move {
task_spawned.fetch_add(1, Ordering::Relaxed);
task_completed.fetch_add(1, Ordering::Relaxed);
Ok::<(), ()>(())
});
if task_result.is_err() {
}
}
}
runtime.run_until_quiescent();
let region_outcome = RegionCloseOutcome::from_region(&runtime, region_id);
if let Some(outcome) = region_outcome {
prop_assert_eq!(outcome.task_count, 0,
"Region should have no remaining tasks (no orphans)");
if outcome.close_successful {
let spawned = spawned_tasks.load(Ordering::Relaxed);
let completed = completed_tasks.load(Ordering::Relaxed);
prop_assert!(spawned >= completed,
"Completed tasks should not exceed spawned tasks");
}
}
});
}
#[test]
fn mr_composite_region_lifecycle() {
proptest!(|(seed in any::<u64>(), operation_sequence in prop::collection::vec(0..4u8, 3..10))| {
let config = LabConfig::new(seed).max_steps(5000);
let mut runtime = LabRuntime::new(config);
let parent_id = runtime.state.create_root_region(Budget::default());
let child_id = runtime.state.create_child_region(parent_id, Budget::default()).unwrap();
let mut parent_close_count = 0;
let mut child_close_count = 0;
let mut cancel_attempts = 0;
for &op in &operation_sequence {
match op {
0 => {
let state = &mut runtime.state;
if let Some(parent) = state.regions.get_mut(parent_id.arena_index()) {
if parent.begin_close(None) {
let _ = parent.begin_finalize();
let _ = parent.complete_close();
}
parent_close_count += 1;
}
}
1 => {
let state = &mut runtime.state;
if let Some(child) = state.regions.get_mut(child_id.arena_index()) {
if child.begin_close(None) {
let _ = child.begin_finalize();
let _ = child.complete_close();
}
child_close_count += 1;
}
}
2 => {
let state = &mut runtime.state;
if let Some(parent) = state.regions.get_mut(parent_id.arena_index()) {
let _ = parent.begin_close(Some(CancelReason::user("close")));
cancel_attempts += 1;
}
}
3 => {
let state = &mut runtime.state;
if let Some(child) = state.regions.get_mut(child_id.arena_index()) {
let _ = child.begin_close(Some(CancelReason::user("close")));
cancel_attempts += 1;
}
}
_ => unreachable!(),
}
}
let parent_outcome = RegionCloseOutcome::from_region(&runtime, parent_id);
let child_outcome = RegionCloseOutcome::from_region(&runtime, child_id);
prop_assert!(parent_close_count >= 0 && child_close_count >= 0);
prop_assert!(cancel_attempts >= 0);
if let (Some(parent), Some(child)) = (parent_outcome, child_outcome) {
if parent.close_successful && child.close_successful {
prop_assert_eq!(parent.child_count, 0, "Closed parent should have no children");
}
if parent.close_successful {
prop_assert_eq!(parent.task_count, 0, "Closed parent should have no tasks");
}
if child.close_successful {
prop_assert_eq!(child.task_count, 0, "Closed child should have no tasks");
}
}
});
}
}
mod metamorphic_cancel_cause_chain_tests {
use super::*;
use proptest::prelude::*;
use std::collections::HashMap;
fn reason_from_variant(variant: u8) -> CancelReason {
match variant {
0 => CancelReason::deadline().with_message("root deadline"),
1 => CancelReason::timeout().with_message("rpc timeout"),
2 => CancelReason::resource_unavailable().with_message("peer unavailable"),
_ => CancelReason::user("operator stop"),
}
}
#[derive(Debug, Clone, PartialEq)]
struct BranchCancelSnapshot {
branch_region_reason: CancelReason,
leaf_region_reason: CancelReason,
branch_task_reason: CancelReason,
leaf_task_reason: CancelReason,
}
fn branch_cancel_snapshot_with_sibling_noise(
sibling_count: usize,
reason: &CancelReason,
) -> BranchCancelSnapshot {
let mut state = RuntimeState::new();
state.set_cancel_attribution_config(CancelAttributionConfig::new(8, usize::MAX));
let root = state.create_root_region(Budget::INFINITE);
let branch = create_child_region(&mut state, root);
let leaf = create_child_region(&mut state, branch);
let branch_task = insert_task(&mut state, branch);
let leaf_task = insert_task(&mut state, leaf);
for _ in 0..sibling_count {
let sibling = create_child_region(&mut state, root);
let _ = insert_task(&mut state, sibling);
let niece = create_child_region(&mut state, sibling);
let _ = insert_task(&mut state, niece);
}
let _ = state.cancel_request(branch, reason, None);
let branch_region_reason = state
.regions
.get(branch.arena_index())
.and_then(RegionRecord::cancel_reason)
.expect("branch cancel reason missing");
let leaf_region_reason = state
.regions
.get(leaf.arena_index())
.and_then(RegionRecord::cancel_reason)
.expect("leaf cancel reason missing");
let branch_task_reason = match &state
.tasks
.get(branch_task.arena_index())
.expect("branch task missing")
.state
{
TaskState::CancelRequested { reason, .. } => reason.clone(),
other => panic!("expected branch task to be cancelling, got {other:?}"),
};
let leaf_task_reason = match &state
.tasks
.get(leaf_task.arena_index())
.expect("leaf task missing")
.state
{
TaskState::CancelRequested { reason, .. } => reason.clone(),
other => panic!("expected leaf task to be cancelling, got {other:?}"),
};
BranchCancelSnapshot {
branch_region_reason,
leaf_region_reason,
branch_task_reason,
leaf_task_reason,
}
}
#[test]
fn mr_cancel_cause_chain_tracks_full_lineage_without_truncation() {
proptest!(|(
nesting_depth in 1..7usize,
extra_depth_budget in 0..3usize,
reason_variant in 0..4u8
)| {
let mut state = RuntimeState::new();
let full_lineage_depth = nesting_depth + 1;
state.set_cancel_attribution_config(CancelAttributionConfig::new(
full_lineage_depth + extra_depth_budget,
usize::MAX,
));
let root = state.create_root_region(Budget::INFINITE);
let mut lineage = vec![root];
for _ in 0..nesting_depth {
let parent = *lineage.last().expect("lineage has root");
let child = create_child_region(&mut state, parent);
lineage.push(child);
}
let leaf = *lineage.last().expect("leaf region missing");
let leaf_task = insert_task(&mut state, leaf);
let original_reason = reason_from_variant(reason_variant);
let expected_root_kind = original_reason.kind;
let expected_root_message = original_reason.message.clone();
let _ = state.cancel_request(root, &original_reason, None);
for (depth_idx, ®ion_id) in lineage.iter().enumerate() {
let region_record = state
.regions
.get(region_id.arena_index())
.expect("region missing");
let region_reason = region_record.cancel_reason();
let reason = region_reason
.as_ref()
.expect("region cancel reason missing");
prop_assert_eq!(
reason.chain_depth(),
depth_idx + 1,
"depth {} should expose the full ancestry",
depth_idx
);
prop_assert!(
!reason.any_truncated(),
"full-depth attribution should not truncate at depth {}",
depth_idx
);
if depth_idx == 0 {
prop_assert_eq!(reason.kind, expected_root_kind);
} else {
prop_assert_eq!(reason.kind, CancelKind::ParentCancelled);
prop_assert_eq!(reason.origin_region, lineage[depth_idx - 1]);
}
let root_cause = reason.root_cause();
prop_assert_eq!(root_cause.kind, expected_root_kind);
prop_assert_eq!(
root_cause.message.as_deref(),
expected_root_message.as_deref()
);
}
let leaf_task_record = state.tasks.get(leaf_task.arena_index()).expect("task missing");
match &leaf_task_record.state {
TaskState::CancelRequested { reason, .. } => {
prop_assert_eq!(reason.kind, CancelKind::ParentCancelled);
prop_assert_eq!(reason.origin_region, lineage[lineage.len() - 2]);
prop_assert_eq!(reason.chain_depth(), full_lineage_depth);
prop_assert!(!reason.any_truncated());
prop_assert_eq!(reason.root_cause().kind, expected_root_kind);
prop_assert_eq!(
reason.root_cause().message.as_deref(),
expected_root_message.as_deref()
);
}
other => {
prop_assert!(false, "expected CancelRequested task state, got {other:?}");
}
}
});
}
#[test]
fn mr_parent_cancel_schedules_ancestors_before_descendants() {
proptest!(|(
child_count in 1..5usize,
grandchildren_per_child in 1..4usize,
reason_variant in 0..4u8
)| {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let root_task = insert_task(&mut state, root);
let mut depth_by_task = HashMap::from([(root_task, 0usize)]);
for _ in 0..child_count {
let child = create_child_region(&mut state, root);
let child_task = insert_task(&mut state, child);
depth_by_task.insert(child_task, 1);
for _ in 0..grandchildren_per_child {
let grandchild = create_child_region(&mut state, child);
let grandchild_task = insert_task(&mut state, grandchild);
depth_by_task.insert(grandchild_task, 2);
}
}
let scheduled = state.cancel_request(root, &reason_from_variant(reason_variant), None);
prop_assert_eq!(
scheduled.len(),
depth_by_task.len(),
"initial cancel cascade should schedule each task exactly once"
);
let scheduled_depths: Vec<_> = scheduled
.iter()
.map(|(task_id, _priority)| {
*depth_by_task
.get(task_id)
.expect("scheduled task missing from depth map")
})
.collect();
prop_assert_eq!(scheduled_depths.first().copied(), Some(0));
prop_assert!(
scheduled_depths.windows(2).all(|pair| pair[0] <= pair[1]),
"cancel scheduling should not visit descendants before ancestors: {scheduled_depths:?}"
);
});
}
#[test]
fn mr_cancel_depth_profile_is_reason_invariant() {
proptest!(|(
child_count in 1..5usize,
grandchildren_per_child in 1..4usize,
lhs_reason_variant in 0..4u8,
rhs_reason_variant in 0..4u8
)| {
let build_depth_profile = |reason_variant: u8| {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let root_task = insert_task(&mut state, root);
let mut depth_by_task = HashMap::from([(root_task, 0usize)]);
for _ in 0..child_count {
let child = create_child_region(&mut state, root);
let child_task = insert_task(&mut state, child);
depth_by_task.insert(child_task, 1);
for _ in 0..grandchildren_per_child {
let grandchild = create_child_region(&mut state, child);
let grandchild_task = insert_task(&mut state, grandchild);
depth_by_task.insert(grandchild_task, 2);
}
}
state
.cancel_request(root, &reason_from_variant(reason_variant), None)
.into_iter()
.map(|(task_id, _priority)| {
*depth_by_task
.get(&task_id)
.expect("scheduled task missing from depth map")
})
.collect::<Vec<_>>()
};
let lhs_profile = build_depth_profile(lhs_reason_variant);
let rhs_profile = build_depth_profile(rhs_reason_variant);
prop_assert_eq!(
lhs_profile,
rhs_profile,
"cancel reason variants should not perturb ancestor-before-descendant scheduling"
);
});
}
#[test]
fn mr_cancel_cause_chain_is_stable_under_sibling_noise() {
proptest!(|(sibling_count in 1..5usize, reason_variant in 0..4u8)| {
let reason = reason_from_variant(reason_variant);
let baseline = branch_cancel_snapshot_with_sibling_noise(0, &reason);
let noisy = branch_cancel_snapshot_with_sibling_noise(sibling_count, &reason);
prop_assert_eq!(
noisy,
baseline,
"sibling regions outside the cancelled branch should not perturb cause chains"
);
});
}
#[test]
fn mr_cancel_request_after_close_preserves_terminal_reason() {
proptest!(|(initial_variant in 0..4u8, followup_variant in 0..4u8)| {
let mut state = RuntimeState::new();
let region_id = state.create_root_region(Budget::INFINITE);
let initial_reason = reason_from_variant(initial_variant);
let followup_reason = reason_from_variant(followup_variant);
{
let region = state
.regions
.get_mut(region_id.arena_index())
.expect("region missing");
prop_assert!(region.begin_close(Some(initial_reason.clone())));
prop_assert!(region.begin_finalize());
prop_assert!(region.complete_close());
}
let tasks_to_cancel = state.cancel_request(region_id, &followup_reason, None);
let region = state
.regions
.get(region_id.arena_index())
.expect("region missing");
prop_assert!(tasks_to_cancel.is_empty());
prop_assert_eq!(region.state(), crate::record::region::RegionState::Closed);
prop_assert_eq!(region.cancel_reason(), Some(initial_reason));
});
}
}
}