use crate::cx::Cx;
use crate::tracing_compat::trace;
use crate::types::{
Budget, CancelPhase, CancelReason, CancelWitness, CxInner, Outcome, RegionId, TaskId, Time,
};
use parking_lot::RwLock;
use smallvec::SmallVec;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use std::task::Waker;
#[cfg(feature = "tracing-integration")]
use std::time::Instant;
pub type TaskOutcome = Outcome<(), crate::error::Error>;
#[derive(Debug, Clone)]
pub enum TaskState {
Created,
Running,
CancelRequested {
reason: CancelReason,
cleanup_budget: Budget,
},
Cancelling {
reason: CancelReason,
cleanup_budget: Budget,
},
Finalizing {
reason: CancelReason,
cleanup_budget: Budget,
},
Completed(TaskOutcome),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum TaskPhase {
Created = 0,
Running = 1,
CancelRequested = 2,
Cancelling = 3,
Finalizing = 4,
Completed = 5,
}
impl TaskPhase {
#[inline]
#[must_use]
pub const fn is_valid_transition(self, next: Self) -> bool {
matches!(
(self as u8, next as u8),
(0, 1 | 2 | 5)
| (1, 2 | 5)
| (2, 2 | 3 | 5)
| (3, 3..=5)
| (4, 4..=5)
)
}
}
#[derive(Debug)]
pub struct TaskPhaseCell {
inner: AtomicU8,
}
impl TaskPhaseCell {
#[inline]
#[must_use]
pub fn new(phase: TaskPhase) -> Self {
Self {
inner: AtomicU8::new(phase as u8),
}
}
#[inline]
#[must_use]
pub fn load(&self) -> TaskPhase {
match self.inner.load(Ordering::Acquire) {
0 => TaskPhase::Created,
1 => TaskPhase::Running,
2 => TaskPhase::CancelRequested,
3 => TaskPhase::Cancelling,
4 => TaskPhase::Finalizing,
5 => TaskPhase::Completed,
v => {
debug_assert!(false, "invalid TaskPhase value: {v}");
TaskPhase::Completed
}
}
}
pub fn store(&self, phase: TaskPhase) {
#[cfg(debug_assertions)]
{
let current = self.load();
debug_assert!(
current.is_valid_transition(phase),
"invalid TaskPhase transition: {current:?} -> {phase:?}"
);
}
self.inner.store(phase as u8, Ordering::Release);
}
}
#[derive(Debug, Default)]
pub struct TaskWakeState {
state: AtomicU8,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum WakeState {
Idle = 0,
Polling = 1,
Notified = 2,
}
impl TaskWakeState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn notify(&self) -> bool {
let prev = self
.state
.swap(WakeState::Notified as u8, Ordering::Release);
prev == WakeState::Idle as u8
}
#[inline]
pub fn begin_poll(&self) {
self.state
.store(WakeState::Polling as u8, Ordering::Relaxed);
}
#[inline]
pub fn finish_poll(&self) -> bool {
match self.state.compare_exchange(
WakeState::Polling as u8,
WakeState::Idle as u8,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => false,
Err(current) => current == WakeState::Notified as u8,
}
}
#[inline]
pub fn clear(&self) {
self.state.store(WakeState::Idle as u8, Ordering::Release);
}
#[inline]
#[must_use]
pub fn is_notified(&self) -> bool {
self.state.load(Ordering::Acquire) == WakeState::Notified as u8
}
}
impl TaskState {
#[inline]
#[must_use]
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed(_))
}
#[inline]
#[must_use]
pub fn is_cancelling(&self) -> bool {
matches!(
self,
Self::CancelRequested { .. } | Self::Cancelling { .. } | Self::Finalizing { .. }
)
}
#[inline]
#[must_use]
pub fn can_be_polled(&self) -> bool {
matches!(
self,
Self::Running
| Self::CancelRequested { .. }
| Self::Cancelling { .. }
| Self::Finalizing { .. }
)
}
}
#[derive(Debug)]
pub struct TaskRecord {
pub id: TaskId,
pub owner: RegionId,
pub state: TaskState,
pub phase: TaskPhaseCell,
pub wake_state: Arc<TaskWakeState>,
pub cx_inner: Option<Arc<RwLock<CxInner>>>,
pub cx: Option<Cx>,
pub created_at: Time,
pub polls_remaining: u32,
pub total_polls: u64,
#[cfg(feature = "tracing-integration")]
pub created_instant: Instant,
pub last_polled_step: u64,
pub waiters: SmallVec<[TaskId; 4]>,
pub cached_waker: Option<(Waker, u8)>,
pub cached_cancel_waker: Option<(Waker, u8)>,
pub cancel_epoch: u64,
pub is_local: bool,
pub pinned_worker: Option<usize>,
pub next_in_queue: Option<TaskId>,
pub prev_in_queue: Option<TaskId>,
pub queue_tag: u8,
pub heap_index: Option<u32>,
pub sched_priority: u8,
pub sched_generation: u64,
}
impl TaskRecord {
#[must_use]
pub fn new(id: TaskId, owner: RegionId, budget: Budget) -> Self {
Self::new_with_time(id, owner, budget, Time::ZERO)
}
#[must_use]
pub fn new_with_time(id: TaskId, owner: RegionId, budget: Budget, created_at: Time) -> Self {
Self {
id,
owner,
state: TaskState::Created,
phase: TaskPhaseCell::new(TaskPhase::Created),
wake_state: Arc::new(TaskWakeState::new()),
cx_inner: None, cx: None,
created_at,
polls_remaining: budget.poll_quota,
total_polls: 0,
#[cfg(feature = "tracing-integration")]
created_instant: Instant::now(),
last_polled_step: 0,
waiters: SmallVec::new(),
cached_waker: None,
cached_cancel_waker: None,
cancel_epoch: 0,
is_local: false,
pinned_worker: None,
next_in_queue: None,
prev_in_queue: None,
queue_tag: 0,
heap_index: None,
sched_priority: 0,
sched_generation: 0,
}
}
#[inline]
#[must_use]
pub const fn created_at(&self) -> Time {
self.created_at
}
#[inline]
pub fn set_cx_inner(&mut self, inner: Arc<RwLock<CxInner>>) {
self.cx_inner = Some(inner);
}
pub fn set_cx(&mut self, cx: Cx) {
self.cx = Some(cx);
}
pub fn mark_polled(&mut self, step: u64) {
self.last_polled_step = step;
}
pub fn increment_polls(&mut self) {
self.total_polls += 1;
}
#[inline]
#[must_use]
pub fn is_runnable(&self) -> bool {
matches!(&self.state, TaskState::Created | TaskState::Running) || self.state.can_be_polled()
}
#[inline]
#[must_use]
pub fn state_name(&self) -> &'static str {
match &self.state {
TaskState::Created => "Created",
TaskState::Running => "Running",
TaskState::CancelRequested { .. } => "CancelRequested",
TaskState::Cancelling { .. } => "Cancelling",
TaskState::Finalizing { .. } => "Finalizing",
TaskState::Completed(_) => "Completed",
}
}
#[inline]
#[must_use]
pub fn phase(&self) -> TaskPhase {
self.phase.load()
}
pub fn request_cancel(&mut self, reason: CancelReason) -> bool {
let budget = reason.cleanup_budget();
self.request_cancel_with_budget(reason, budget)
}
#[allow(clippy::too_many_lines)]
#[allow(clippy::used_underscore_binding)]
pub fn request_cancel_with_budget(
&mut self,
reason: CancelReason,
cleanup_budget: Budget,
) -> bool {
if self.state.is_terminal() {
return false;
}
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
}
let mut updated_reason_for_inner = None;
let result = match &mut self.state {
TaskState::CancelRequested {
reason: existing_reason,
cleanup_budget: existing_budget,
} => {
self.phase.store(TaskPhase::CancelRequested);
trace!(
task_id = ?self.id,
region_id = ?self.owner,
cancel_kind = ?reason.kind,
"cancel reason strengthened (already CancelRequested)"
);
existing_reason.strengthen(&reason);
*existing_budget = existing_budget.combine(cleanup_budget);
updated_reason_for_inner = Some(existing_reason.clone());
false
}
TaskState::Cancelling {
reason: existing_reason,
cleanup_budget: b,
} => {
self.phase.store(TaskPhase::Cancelling);
trace!(
task_id = ?self.id,
region_id = ?self.owner,
cancel_kind = ?reason.kind,
"cancel reason strengthened (in cleanup)"
);
existing_reason.strengthen(&reason);
let new_budget = b.combine(cleanup_budget);
*b = new_budget;
updated_reason_for_inner = Some(existing_reason.clone());
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
guard.budget = new_budget;
guard.budget_baseline = new_budget;
}
self.polls_remaining = self.polls_remaining.min(new_budget.poll_quota);
false
}
TaskState::Finalizing {
reason: existing_reason,
cleanup_budget: b,
} => {
self.phase.store(TaskPhase::Finalizing);
trace!(
task_id = ?self.id,
region_id = ?self.owner,
cancel_kind = ?reason.kind,
"cancel reason strengthened (in cleanup)"
);
existing_reason.strengthen(&reason);
let new_budget = b.combine(cleanup_budget);
*b = new_budget;
updated_reason_for_inner = Some(existing_reason.clone());
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
guard.budget = new_budget;
guard.budget_baseline = new_budget;
}
self.polls_remaining = self.polls_remaining.min(new_budget.poll_quota);
false
}
TaskState::Created | TaskState::Running => {
let prev_state = self.state_name();
#[cfg(not(feature = "tracing-integration"))]
let _ = prev_state;
let requested_reason = reason.clone();
if self.cancel_epoch == 0 {
self.cancel_epoch = 1;
} else {
self.cancel_epoch = self.cancel_epoch.saturating_add(1);
}
crate::tracing_compat::debug!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = prev_state,
new_state = "CancelRequested",
cancel_kind = ?reason.kind,
cleanup_poll_quota = cleanup_budget.poll_quota,
"task cancel requested"
);
self.state = TaskState::CancelRequested {
reason,
cleanup_budget,
};
self.phase.store(TaskPhase::CancelRequested);
updated_reason_for_inner = Some(requested_reason);
true
}
TaskState::Completed(_) => false,
};
if let Some(reason) = updated_reason_for_inner {
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
guard.cancel_reason = Some(reason);
}
}
if let Some(inner) = &self.cx_inner {
let waker = {
let guard = inner.read();
if guard.cancel_requested {
guard.cancel_waker.clone()
} else {
None
}
};
if let Some(waker) = waker {
waker.wake_by_ref();
}
}
result
}
#[must_use]
pub fn cancel_witness(&self) -> Option<CancelWitness> {
if self.cancel_epoch == 0 {
return None;
}
let (phase, reason) = match &self.state {
TaskState::CancelRequested { reason, .. } => (CancelPhase::Requested, reason.clone()),
TaskState::Cancelling { reason, .. } => (CancelPhase::Cancelling, reason.clone()),
TaskState::Finalizing { reason, .. } => (CancelPhase::Finalizing, reason.clone()),
TaskState::Completed(Outcome::Cancelled(reason)) => {
(CancelPhase::Completed, reason.clone())
}
_ => return None,
};
Some(CancelWitness::new(
self.id,
self.owner,
self.cancel_epoch,
phase,
reason,
))
}
pub fn start_running(&mut self) -> bool {
match self.state {
TaskState::Created => {
trace!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = "Created",
new_state = "Running",
"task state transition"
);
self.state = TaskState::Running;
self.phase.store(TaskPhase::Running);
true
}
_ => false,
}
}
#[allow(clippy::used_underscore_binding, clippy::no_effect_underscore_binding)]
pub fn complete(&mut self, outcome: TaskOutcome) -> bool {
if self.state.is_terminal() {
return false;
}
let outcome = match (&self.state, outcome) {
(
TaskState::CancelRequested { reason, .. }
| TaskState::Cancelling { reason, .. }
| TaskState::Finalizing { reason, .. },
Outcome::Ok(()) | Outcome::Err(_),
) => Outcome::Cancelled(reason.clone()),
(
TaskState::CancelRequested { reason, .. }
| TaskState::Cancelling { reason, .. }
| TaskState::Finalizing { reason, .. },
Outcome::Cancelled(outcome_reason),
) => {
let mut final_reason = reason.clone();
final_reason.strengthen(&outcome_reason);
Outcome::Cancelled(final_reason)
}
(_, outcome) => outcome,
};
if matches!(outcome, Outcome::Cancelled(_)) && self.cancel_epoch == 0 {
self.cancel_epoch = 1;
}
#[cfg(feature = "tracing-integration")]
{
let prev_state = self.state_name();
let outcome_label = match &outcome {
Outcome::Ok(()) => "Ok",
Outcome::Err(_) => "Err",
Outcome::Cancelled(_) => "Cancelled",
Outcome::Panicked(_) => "Panicked",
};
let duration_us = self
.created_instant
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64;
let total_polls = self.total_polls;
crate::tracing_compat::debug!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = prev_state,
new_state = "Completed",
outcome_kind = outcome_label,
duration_us = duration_us,
poll_count = total_polls,
"task completed"
);
}
self.state = TaskState::Completed(outcome);
self.phase.store(TaskPhase::Completed);
true
}
pub fn add_waiter(&mut self, waiter: TaskId) {
if !self.waiters.contains(&waiter) {
self.waiters.push(waiter);
}
}
pub fn acknowledge_cancel(&mut self) -> Option<CancelReason> {
match &self.state {
TaskState::CancelRequested {
reason,
cleanup_budget,
} => {
let reason = reason.clone();
let budget = *cleanup_budget;
trace!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = "CancelRequested",
new_state = "Cancelling",
cancel_kind = ?reason.kind,
cleanup_poll_quota = budget.poll_quota,
cleanup_priority = budget.priority,
"task acknowledged cancellation"
);
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
guard.budget = budget;
guard.budget_baseline = budget;
}
self.polls_remaining = budget.poll_quota;
self.state = TaskState::Cancelling {
reason: reason.clone(),
cleanup_budget: budget,
};
self.phase.store(TaskPhase::Cancelling);
Some(reason)
}
_ => None,
}
}
pub fn cleanup_done(&mut self) -> bool {
match &self.state {
TaskState::Cancelling {
reason,
cleanup_budget,
} => {
let reason = reason.clone();
let budget = *cleanup_budget;
trace!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = "Cancelling",
new_state = "Finalizing",
cancel_kind = ?reason.kind,
finalizer_budget_poll_quota = budget.poll_quota,
finalizer_budget_priority = budget.priority,
"task cleanup done, entering finalization"
);
self.state = TaskState::Finalizing {
reason,
cleanup_budget: budget,
};
self.phase.store(TaskPhase::Finalizing);
true
}
_ => false,
}
}
#[allow(clippy::no_effect_underscore_binding)]
pub fn finalize_done(&mut self) -> bool {
self.finalize_done_with_witness().is_some()
}
#[allow(clippy::no_effect_underscore_binding)]
pub fn finalize_done_with_witness(&mut self) -> Option<CancelWitness> {
let TaskState::Finalizing {
reason,
cleanup_budget,
} = &self.state
else {
return None;
};
let reason = reason.clone();
let budget = *cleanup_budget;
#[cfg(feature = "tracing-integration")]
{
let duration_us = self
.created_instant
.elapsed()
.as_micros()
.min(u128::from(u64::MAX)) as u64;
let total_polls = self.total_polls;
crate::tracing_compat::debug!(
task_id = ?self.id,
region_id = ?self.owner,
old_state = "Finalizing",
new_state = "Completed",
outcome_kind = "Cancelled",
cancel_kind = ?reason.kind,
finalizer_budget_poll_quota = budget.poll_quota,
finalizer_budget_priority = budget.priority,
duration_us = duration_us,
poll_count = total_polls,
"task finalization done"
);
}
let _ = budget;
self.state = TaskState::Completed(Outcome::Cancelled(reason.clone()));
self.phase.store(TaskPhase::Completed);
Some(CancelWitness::new(
self.id,
self.owner,
self.cancel_epoch,
CancelPhase::Completed,
reason,
))
}
#[must_use]
pub fn cancel_reason(&self) -> Option<&CancelReason> {
match &self.state {
TaskState::CancelRequested { reason, .. }
| TaskState::Cancelling { reason, .. }
| TaskState::Finalizing { reason, .. } => Some(reason),
_ => None,
}
}
#[must_use]
pub fn cleanup_budget(&self) -> Option<Budget> {
match &self.state {
TaskState::CancelRequested { cleanup_budget, .. }
| TaskState::Cancelling { cleanup_budget, .. }
| TaskState::Finalizing { cleanup_budget, .. } => Some(*cleanup_budget),
_ => None,
}
}
pub fn mark_local(&mut self) {
self.is_local = true;
}
pub fn pin_to_worker(&mut self, worker_id: usize) {
self.is_local = true;
self.pinned_worker = Some(worker_id);
}
#[must_use]
#[inline]
pub const fn is_local(&self) -> bool {
self.is_local
}
#[must_use]
#[inline]
pub const fn pinned_worker(&self) -> Option<usize> {
self.pinned_worker
}
#[must_use]
#[inline]
pub const fn is_in_queue(&self) -> bool {
self.queue_tag != 0
}
#[must_use]
#[inline]
pub const fn is_in_queue_tag(&self, tag: u8) -> bool {
self.queue_tag == tag
}
#[inline]
pub fn set_queue_links(&mut self, prev: Option<TaskId>, next: Option<TaskId>, tag: u8) {
self.prev_in_queue = prev;
self.next_in_queue = next;
self.queue_tag = tag;
}
#[inline]
pub fn clear_queue_links(&mut self) {
self.prev_in_queue = None;
self.next_in_queue = None;
self.queue_tag = 0;
}
pub fn decrement_mask(&mut self) -> Option<u32> {
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
if guard.mask_depth > 0 {
guard.mask_depth -= 1;
return Some(guard.mask_depth);
}
}
None
}
pub fn increment_mask(&mut self) -> u32 {
if let Some(inner) = &self.cx_inner {
let mut guard = inner.write();
assert!(
guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
"mask depth exceeded MAX_MASK_DEPTH ({}): violates INV-MASK-BOUNDED",
crate::types::task_context::MAX_MASK_DEPTH,
);
guard.mask_depth += 1;
return guard.mask_depth;
}
0 }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::{Error, ErrorKind};
use crate::util::ArenaIndex;
use serde_json::{Value, json};
use std::sync::atomic::AtomicUsize;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn task() -> TaskId {
TaskId::from_arena(ArenaIndex::new(0, 0))
}
fn region() -> RegionId {
RegionId::from_arena(ArenaIndex::new(0, 0))
}
fn scrub_task_record_ids(value: Value) -> Value {
let mut scrubbed = value;
if let Some(task_id) = scrubbed.pointer_mut("/task_id") {
*task_id = json!("[TASK_ID]");
}
if let Some(region_id) = scrubbed.pointer_mut("/region_id") {
*region_id = json!("[REGION_ID]");
}
if let Some(origin_region) = scrubbed.pointer_mut("/reason/origin_region") {
*origin_region = json!("[REGION_ID]");
}
if let Some(origin_task) = scrubbed.pointer_mut("/reason/origin_task") {
*origin_task = json!("[TASK_ID]");
}
scrubbed
}
#[test]
fn task_phase_transitions_are_atomic() {
init_test("task_phase_transitions_are_atomic");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
crate::assert_with_log!(
t.phase() == TaskPhase::Created,
"phase created",
TaskPhase::Created,
t.phase()
);
let started = t.start_running();
crate::assert_with_log!(started, "start_running", true, started);
crate::assert_with_log!(
t.phase() == TaskPhase::Running,
"phase running",
TaskPhase::Running,
t.phase()
);
let requested = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(requested, "request_cancel", true, requested);
crate::assert_with_log!(
t.phase() == TaskPhase::CancelRequested,
"phase cancel requested",
TaskPhase::CancelRequested,
t.phase()
);
let ack = t.acknowledge_cancel();
crate::assert_with_log!(ack.is_some(), "acknowledge_cancel", true, ack.is_some());
crate::assert_with_log!(
t.phase() == TaskPhase::Cancelling,
"phase cancelling",
TaskPhase::Cancelling,
t.phase()
);
let cleaned = t.cleanup_done();
crate::assert_with_log!(cleaned, "cleanup_done", true, cleaned);
crate::assert_with_log!(
t.phase() == TaskPhase::Finalizing,
"phase finalizing",
TaskPhase::Finalizing,
t.phase()
);
let finalized = t.finalize_done();
crate::assert_with_log!(finalized, "finalize_done", true, finalized);
crate::assert_with_log!(
t.phase() == TaskPhase::Completed,
"phase completed",
TaskPhase::Completed,
t.phase()
);
crate::test_complete!("task_phase_transitions_are_atomic");
}
#[test]
fn wake_state_dedups_across_threads() {
init_test("wake_state_dedups_across_threads");
let state = Arc::new(TaskWakeState::new());
let successes = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..8 {
let state = Arc::clone(&state);
let successes = Arc::clone(&successes);
handles.push(std::thread::spawn(move || {
if state.notify() {
successes.fetch_add(1, Ordering::SeqCst);
}
}));
}
for handle in handles {
handle.join().expect("thread join");
}
let count = successes.load(Ordering::SeqCst);
crate::assert_with_log!(count == 1, "single notify wins", 1usize, count);
let notified = state.is_notified();
crate::assert_with_log!(notified, "notified true", true, notified);
state.clear();
let cleared = state.is_notified();
crate::assert_with_log!(!cleared, "notified cleared", false, cleared);
crate::test_complete!("wake_state_dedups_across_threads");
}
#[test]
fn wake_state_tracks_wake_during_poll() {
init_test("wake_state_tracks_wake_during_poll");
let state = TaskWakeState::new();
state.begin_poll();
let woken = state.finish_poll();
crate::assert_with_log!(!woken, "no wake during poll", false, woken);
state.begin_poll();
let scheduled = state.notify();
crate::assert_with_log!(
!scheduled,
"wake during poll does not schedule",
false,
scheduled
);
let woken = state.finish_poll();
crate::assert_with_log!(woken, "wake observed after poll", true, woken);
let pending = state.is_notified();
crate::assert_with_log!(pending, "pending wake recorded", true, pending);
state.clear();
let cleared = state.is_notified();
crate::assert_with_log!(!cleared, "wake cleared", false, cleared);
crate::test_complete!("wake_state_tracks_wake_during_poll");
}
#[test]
fn cancel_before_first_poll_enters_cancel_requested() {
init_test("cancel_before_first_poll_enters_cancel_requested");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let created = matches!(t.state, TaskState::Created);
crate::assert_with_log!(created, "created", true, created);
let requested = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(requested, "request_cancel", true, requested);
match &t.state {
TaskState::CancelRequested {
reason,
cleanup_budget: _,
} => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Timeout,
"reason kind",
crate::types::CancelKind::Timeout,
reason.kind
);
}
other => panic!("expected CancelRequested, got {other:?}"),
}
crate::test_complete!("cancel_before_first_poll_enters_cancel_requested");
}
#[test]
fn cancel_strengthens_idempotently_when_already_cancel_requested() {
init_test("cancel_strengthens_idempotently_when_already_cancel_requested");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let first = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(first, "first cancel", true, first);
let second = t.request_cancel(CancelReason::shutdown());
crate::assert_with_log!(!second, "second cancel false", false, second);
match &t.state {
TaskState::CancelRequested { reason, .. } => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Shutdown,
"reason kind",
crate::types::CancelKind::Shutdown,
reason.kind
);
}
other => panic!("expected CancelRequested, got {other:?}"),
}
crate::test_complete!("cancel_strengthens_idempotently_when_already_cancel_requested");
}
#[test]
fn completed_is_absorbing() {
init_test("completed_is_absorbing");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let completed = t.complete(Outcome::Ok(()));
crate::assert_with_log!(completed, "complete ok", true, completed);
let requested = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(!requested, "request_cancel false", false, requested);
let terminal = t.state.is_terminal();
crate::assert_with_log!(terminal, "terminal", true, terminal);
match &t.state {
TaskState::Completed(outcome) => {
let ok = matches!(outcome, Outcome::Ok(()));
crate::assert_with_log!(ok, "outcome ok", true, ok);
}
other => panic!("expected Completed, got {other:?}"),
}
crate::test_complete!("completed_is_absorbing");
}
#[test]
fn can_be_polled_matches_state() {
init_test("can_be_polled_matches_state");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let can_poll = t.state.can_be_polled();
crate::assert_with_log!(!can_poll, "not pollable", false, can_poll);
let started = t.start_running();
crate::assert_with_log!(started, "start_running", true, started);
let can_poll = t.state.can_be_polled();
crate::assert_with_log!(can_poll, "pollable", true, can_poll);
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel_with_budget(CancelReason::timeout(), Budget::INFINITE);
let can_poll = t.state.can_be_polled();
crate::assert_with_log!(can_poll, "pollable after cancel", true, can_poll);
crate::test_complete!("can_be_polled_matches_state");
}
#[test]
fn complete_with_error_outcome() {
init_test("complete_with_error_outcome");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let err = Error::new(ErrorKind::User);
let completed = t.complete(Outcome::Err(err));
crate::assert_with_log!(completed, "complete err", true, completed);
let terminal = t.state.is_terminal();
crate::assert_with_log!(terminal, "terminal", true, terminal);
crate::test_complete!("complete_with_error_outcome");
}
#[test]
fn complete_cancelled_without_prior_request_still_emits_witness() {
init_test("complete_cancelled_without_prior_request_still_emits_witness");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.start_running();
let completed = t.complete(Outcome::Cancelled(CancelReason::timeout()));
crate::assert_with_log!(completed, "complete cancelled", true, completed);
let witness = t.cancel_witness().expect("completed cancel witness");
crate::assert_with_log!(witness.epoch == 1, "epoch initialized", 1, witness.epoch);
crate::assert_with_log!(
witness.phase == CancelPhase::Completed,
"phase completed",
CancelPhase::Completed,
witness.phase
);
CancelWitness::validate_transition(None, &witness)
.expect("terminal cancelled witness is self-consistent");
crate::test_complete!("complete_cancelled_without_prior_request_still_emits_witness");
}
#[test]
fn complete_ok_after_cancel_request_becomes_cancelled() {
init_test("complete_ok_after_cancel_request_becomes_cancelled");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let requested = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(requested, "request_cancel", true, requested);
let completed = t.complete(Outcome::Ok(()));
crate::assert_with_log!(completed, "complete ok", true, completed);
match &t.state {
TaskState::Completed(Outcome::Cancelled(reason)) => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Timeout,
"cancel reason preserved",
crate::types::CancelKind::Timeout,
reason.kind
);
}
other => panic!("expected Completed(Cancelled), got {other:?}"),
}
let witness = t
.cancel_witness()
.expect("cancel witness after coerced completion");
crate::assert_with_log!(
witness.phase == CancelPhase::Completed,
"phase completed",
CancelPhase::Completed,
witness.phase
);
crate::test_complete!("complete_ok_after_cancel_request_becomes_cancelled");
}
#[test]
fn complete_err_after_cancel_request_becomes_cancelled() {
init_test("complete_err_after_cancel_request_becomes_cancelled");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let requested = t.request_cancel(CancelReason::timeout());
crate::assert_with_log!(requested, "request_cancel", true, requested);
let err = Error::new(ErrorKind::User);
let completed = t.complete(Outcome::Err(err));
crate::assert_with_log!(completed, "complete err", true, completed);
match &t.state {
TaskState::Completed(Outcome::Cancelled(reason)) => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Timeout,
"cancel reason preserved",
crate::types::CancelKind::Timeout,
reason.kind
);
}
other => panic!("expected Completed(Cancelled), got {other:?}"),
}
let witness = t
.cancel_witness()
.expect("cancel witness after coerced completion");
crate::assert_with_log!(
witness.phase == CancelPhase::Completed,
"phase completed",
CancelPhase::Completed,
witness.phase
);
crate::test_complete!("complete_err_after_cancel_request_becomes_cancelled");
}
#[test]
fn complete_ok_during_cancellation_cleanup_becomes_cancelled() {
init_test("complete_ok_during_cancellation_cleanup_becomes_cancelled");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let _ = t.acknowledge_cancel();
let completed = t.complete(Outcome::Ok(()));
crate::assert_with_log!(completed, "complete ok", true, completed);
let cancelled = matches!(t.state, TaskState::Completed(Outcome::Cancelled(_)));
crate::assert_with_log!(cancelled, "completed cancelled", true, cancelled);
let witness = t
.cancel_witness()
.expect("cancel witness during cleanup completion");
crate::assert_with_log!(
witness.phase == CancelPhase::Completed,
"phase completed",
CancelPhase::Completed,
witness.phase
);
crate::test_complete!("complete_ok_during_cancellation_cleanup_becomes_cancelled");
}
#[test]
fn complete_cancelled_during_protocol_does_not_weaken_reason() {
init_test("complete_cancelled_during_protocol_does_not_weaken_reason");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let completed = t.complete(Outcome::Cancelled(CancelReason::user("soft")));
crate::assert_with_log!(completed, "complete cancelled", true, completed);
match &t.state {
TaskState::Completed(Outcome::Cancelled(reason)) => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Timeout,
"cancel reason stayed strongest",
crate::types::CancelKind::Timeout,
reason.kind
);
}
other => panic!("expected Completed(Cancelled), got {other:?}"),
}
let witness = t.cancel_witness().expect("cancel witness after completion");
crate::assert_with_log!(
witness.reason.kind == crate::types::CancelKind::Timeout,
"witness reason stayed strongest",
crate::types::CancelKind::Timeout,
witness.reason.kind
);
crate::test_complete!("complete_cancelled_during_protocol_does_not_weaken_reason");
}
#[test]
fn complete_ok_during_finalization_becomes_cancelled() {
init_test("complete_ok_during_finalization_becomes_cancelled");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let _ = t.acknowledge_cancel();
let _ = t.cleanup_done();
let completed = t.complete(Outcome::Ok(()));
crate::assert_with_log!(completed, "complete ok", true, completed);
let cancelled = matches!(t.state, TaskState::Completed(Outcome::Cancelled(_)));
crate::assert_with_log!(cancelled, "completed cancelled", true, cancelled);
let witness = t
.cancel_witness()
.expect("cancel witness during finalization completion");
crate::assert_with_log!(
witness.phase == CancelPhase::Completed,
"phase completed",
CancelPhase::Completed,
witness.phase
);
crate::test_complete!("complete_ok_during_finalization_becomes_cancelled");
}
#[test]
fn acknowledge_cancel_transitions_to_cancelling() {
init_test("acknowledge_cancel_transitions_to_cancelling");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let reason = t.acknowledge_cancel();
let has_reason = reason.is_some();
crate::assert_with_log!(has_reason, "reason present", true, has_reason);
let kind = reason.unwrap().kind;
crate::assert_with_log!(
kind == crate::types::CancelKind::Timeout,
"reason kind",
crate::types::CancelKind::Timeout,
kind
);
let cancelling = matches!(
t.state,
TaskState::Cancelling {
reason: CancelReason {
kind: crate::types::CancelKind::Timeout,
..
},
..
}
);
crate::assert_with_log!(cancelling, "state cancelling", true, cancelling);
crate::test_complete!("acknowledge_cancel_transitions_to_cancelling");
}
#[test]
fn acknowledge_cancel_fails_for_wrong_state() {
init_test("acknowledge_cancel_fails_for_wrong_state");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let none = t.acknowledge_cancel().is_none();
crate::assert_with_log!(none, "none in created", true, none);
t.start_running();
let none = t.acknowledge_cancel().is_none();
crate::assert_with_log!(none, "none in running", true, none);
crate::test_complete!("acknowledge_cancel_fails_for_wrong_state");
}
#[test]
fn cleanup_done_transitions_to_finalizing() {
init_test("cleanup_done_transitions_to_finalizing");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let _ = t.acknowledge_cancel();
let cancelling = matches!(t.state, TaskState::Cancelling { .. });
crate::assert_with_log!(cancelling, "state cancelling", true, cancelling);
let cleanup = t.cleanup_done();
crate::assert_with_log!(cleanup, "cleanup_done", true, cleanup);
let finalizing = matches!(t.state, TaskState::Finalizing { .. });
crate::assert_with_log!(finalizing, "state finalizing", true, finalizing);
crate::test_complete!("cleanup_done_transitions_to_finalizing");
}
#[test]
fn cleanup_done_fails_for_wrong_state() {
init_test("cleanup_done_fails_for_wrong_state");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let cleanup = t.cleanup_done();
crate::assert_with_log!(!cleanup, "cleanup_done false", false, cleanup);
let _ = t.request_cancel(CancelReason::timeout());
let cleanup = t.cleanup_done();
crate::assert_with_log!(!cleanup, "cleanup_done false", false, cleanup);
crate::test_complete!("cleanup_done_fails_for_wrong_state");
}
#[test]
fn finalize_done_transitions_to_completed_cancelled() {
init_test("finalize_done_transitions_to_completed_cancelled");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let _ = t.request_cancel(CancelReason::timeout());
let _ = t.acknowledge_cancel();
let _ = t.cleanup_done();
let finalizing = matches!(t.state, TaskState::Finalizing { .. });
crate::assert_with_log!(finalizing, "state finalizing", true, finalizing);
let finalized = t.finalize_done();
crate::assert_with_log!(finalized, "finalize_done", true, finalized);
let terminal = t.state.is_terminal();
crate::assert_with_log!(terminal, "terminal", true, terminal);
match &t.state {
TaskState::Completed(Outcome::Cancelled(reason)) => {
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::Timeout,
"reason kind",
crate::types::CancelKind::Timeout,
reason.kind
);
}
other => panic!("expected Completed(Cancelled), got {other:?}"),
}
crate::test_complete!("finalize_done_transitions_to_completed_cancelled");
}
#[test]
fn full_cancellation_protocol_flow() {
init_test("full_cancellation_protocol_flow");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let created = matches!(t.state, TaskState::Created);
crate::assert_with_log!(created, "created", true, created);
let requested = t.request_cancel(CancelReason::user("stop"));
crate::assert_with_log!(requested, "request_cancel", true, requested);
let requested_state = matches!(t.state, TaskState::CancelRequested { .. });
crate::assert_with_log!(
requested_state,
"state cancel requested",
true,
requested_state
);
let cancelling = t.state.is_cancelling();
crate::assert_with_log!(cancelling, "state cancelling", true, cancelling);
let reason = t.acknowledge_cancel().expect("should acknowledge");
crate::assert_with_log!(
reason.kind == crate::types::CancelKind::User,
"reason kind",
crate::types::CancelKind::User,
reason.kind
);
let cancelling = matches!(t.state, TaskState::Cancelling { .. });
crate::assert_with_log!(cancelling, "state cancelling", true, cancelling);
let cleanup = t.cleanup_done();
crate::assert_with_log!(cleanup, "cleanup_done", true, cleanup);
let finalizing = matches!(t.state, TaskState::Finalizing { .. });
crate::assert_with_log!(finalizing, "state finalizing", true, finalizing);
let finalized = t.finalize_done();
crate::assert_with_log!(finalized, "finalize_done", true, finalized);
let terminal = t.state.is_terminal();
crate::assert_with_log!(terminal, "terminal", true, terminal);
let cancelled = matches!(t.state, TaskState::Completed(Outcome::Cancelled(_)));
crate::assert_with_log!(cancelled, "cancelled", true, cancelled);
crate::test_complete!("full_cancellation_protocol_flow");
}
#[test]
fn cancellation_witness_sequence_is_monotone() {
init_test("cancellation_witness_sequence_is_monotone");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
t.start_running();
let _ = t.request_cancel(CancelReason::timeout());
let w1 = t.cancel_witness().expect("requested witness");
let _ = t.acknowledge_cancel();
let w2 = t.cancel_witness().expect("cancelling witness");
CancelWitness::validate_transition(Some(&w1), &w2).expect("requested -> cancelling");
let _ = t.cleanup_done();
let w3 = t.cancel_witness().expect("finalizing witness");
CancelWitness::validate_transition(Some(&w2), &w3).expect("cancelling -> finalizing");
let w4 = t.finalize_done_with_witness().expect("completed witness");
CancelWitness::validate_transition(Some(&w3), &w4).expect("finalizing -> completed");
crate::test_complete!("cancellation_witness_sequence_is_monotone");
}
#[test]
fn cancellation_witness_idempotent_requests() {
init_test("cancellation_witness_idempotent_requests");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
t.start_running();
let _ = t.request_cancel(CancelReason::timeout());
let w1 = t.cancel_witness().expect("first witness");
let _ = t.request_cancel(CancelReason::shutdown());
let w2 = t.cancel_witness().expect("second witness");
crate::assert_with_log!(w1.epoch == w2.epoch, "epoch stable", w1.epoch, w2.epoch);
CancelWitness::validate_transition(Some(&w1), &w2).expect("idempotent request transition");
crate::test_complete!("cancellation_witness_idempotent_requests");
}
#[test]
fn cancellation_witness_rejects_out_of_order() {
init_test("cancellation_witness_rejects_out_of_order");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
t.start_running();
let _ = t.request_cancel(CancelReason::timeout());
let requested = t.cancel_witness().expect("requested witness");
let _ = t.acknowledge_cancel();
let _ = t.cleanup_done();
let completed = t.finalize_done_with_witness().expect("completed witness");
let err = CancelWitness::validate_transition(Some(&completed), &requested).err();
crate::assert_with_log!(err.is_some(), "out of order rejected", true, err.is_some());
crate::test_complete!("cancellation_witness_rejects_out_of_order");
}
#[test]
fn masking_operations() {
init_test("masking_operations");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let inner = Arc::new(RwLock::new(CxInner::new(
region(),
task(),
Budget::INFINITE,
)));
t.set_cx_inner(inner);
let mask1 = t.increment_mask();
crate::assert_with_log!(mask1 == 1, "mask 1", 1, mask1);
let mask2 = t.increment_mask();
crate::assert_with_log!(mask2 == 2, "mask 2", 2, mask2);
let dec1 = t.decrement_mask();
crate::assert_with_log!(dec1 == Some(1), "dec 1", Some(1), dec1);
let dec0 = t.decrement_mask();
crate::assert_with_log!(dec0 == Some(0), "dec 0", Some(0), dec0);
let dec_none = t.decrement_mask();
crate::assert_with_log!(dec_none.is_none(), "dec none", true, dec_none.is_none());
crate::test_complete!("masking_operations");
}
#[test]
fn cleanup_budget_accessor() {
init_test("cleanup_budget_accessor");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let none = t.cleanup_budget().is_none();
crate::assert_with_log!(none, "no budget", true, none);
let _ = t.request_cancel_with_budget(
CancelReason::timeout(),
Budget::new().with_poll_quota(500),
);
let budget = t.cleanup_budget().expect("should have cleanup budget");
crate::assert_with_log!(
budget.poll_quota == 500,
"poll_quota",
500,
budget.poll_quota
);
crate::test_complete!("cleanup_budget_accessor");
}
#[test]
fn request_cancel_updates_shared_cx() {
init_test("request_cancel_updates_shared_cx");
let mut t = TaskRecord::new(task(), region(), Budget::INFINITE);
let inner = Arc::new(RwLock::new(CxInner::new(
region(),
task(),
Budget::INFINITE,
)));
t.set_cx_inner(inner.clone());
let cancel_requested = inner.read().cancel_requested;
crate::assert_with_log!(
!cancel_requested,
"cancel_requested false",
false,
cancel_requested
);
let cancel_reason_none = inner.read().cancel_reason.is_none();
crate::assert_with_log!(
cancel_reason_none,
"cancel_reason none",
true,
cancel_reason_none
);
t.request_cancel(CancelReason::timeout());
let cancel_requested = inner.read().cancel_requested;
crate::assert_with_log!(
cancel_requested,
"cancel_requested true",
true,
cancel_requested
);
let cancel_reason = inner.read().cancel_reason.clone();
crate::assert_with_log!(
cancel_reason == Some(CancelReason::timeout()),
"cancel_reason",
Some(CancelReason::timeout()),
cancel_reason
);
let requested_state = matches!(t.state, TaskState::CancelRequested { .. });
crate::assert_with_log!(
requested_state,
"state cancel requested",
true,
requested_state
);
crate::test_complete!("request_cancel_updates_shared_cx");
}
#[test]
fn task_record_cancel_witness_snapshot_scrubs_ids() {
init_test("task_record_cancel_witness_snapshot_scrubs_ids");
let mut record = TaskRecord::new(
TaskId::new_for_test(4, 2),
RegionId::new_for_test(8, 1),
Budget::new().with_poll_quota(5),
);
let requested = record.request_cancel(
CancelReason::linked_exit()
.with_region(RegionId::new_for_test(77, 6))
.with_task(TaskId::new_for_test(11, 5))
.with_timestamp(Time::from_nanos(44))
.with_message("peer closed"),
);
crate::assert_with_log!(requested, "request_cancel", true, requested);
insta::assert_json_snapshot!(
"task_record_cancel_witness_scrubbed_ids",
scrub_task_record_ids(
serde_json::to_value(record.cancel_witness().expect("cancel witness"))
.expect("serialize witness")
)
);
crate::test_complete!("task_record_cancel_witness_snapshot_scrubs_ids");
}
use TaskPhase::*;
#[test]
fn valid_transitions_accepted() {
init_test("valid_transitions_accepted");
let valid = [
(Created, Running),
(Created, CancelRequested),
(Created, Completed), (Running, CancelRequested),
(Running, Completed),
(CancelRequested, CancelRequested), (CancelRequested, Cancelling),
(CancelRequested, Completed), (Cancelling, Cancelling), (Cancelling, Finalizing),
(Cancelling, Completed), (Finalizing, Finalizing), (Finalizing, Completed),
];
for (from, to) in valid {
crate::assert_with_log!(
from.is_valid_transition(to),
"transition should be valid",
true,
(from, to)
);
}
crate::test_complete!("valid_transitions_accepted");
}
#[test]
fn invalid_transitions_rejected() {
init_test("invalid_transitions_rejected");
let invalid = [
(Running, Created),
(CancelRequested, Running),
(CancelRequested, Created),
(Cancelling, CancelRequested),
(Cancelling, Running),
(Cancelling, Created),
(Finalizing, Cancelling),
(Finalizing, CancelRequested),
(Finalizing, Running),
(Finalizing, Created),
(Created, Cancelling),
(Created, Finalizing),
(Running, Cancelling),
(Running, Finalizing),
(CancelRequested, Finalizing),
(Completed, Created),
(Completed, Running),
(Completed, CancelRequested),
(Completed, Cancelling),
(Completed, Finalizing),
(Completed, Completed),
];
for (from, to) in invalid {
crate::assert_with_log!(
!from.is_valid_transition(to),
"transition should be invalid",
false,
(from, to)
);
}
crate::test_complete!("invalid_transitions_rejected");
}
#[test]
fn transition_table_is_exhaustive() {
init_test("transition_table_is_exhaustive");
let phases = [
Created,
Running,
CancelRequested,
Cancelling,
Finalizing,
Completed,
];
let mut valid_count = 0;
let mut invalid_count = 0;
for from in phases {
for to in phases {
if from.is_valid_transition(to) {
valid_count += 1;
} else {
invalid_count += 1;
}
}
}
crate::assert_with_log!(
valid_count == 13,
"valid transitions count",
13,
valid_count
);
crate::assert_with_log!(
invalid_count == 23,
"invalid transitions count",
23,
invalid_count
);
crate::test_complete!("transition_table_is_exhaustive");
}
}