use core::fmt;
use parking_lot::RwLock;
use smallvec::SmallVec;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use crate::types::symbol::{ObjectId, Symbol};
use crate::types::{Budget, CancelAttributionConfig, CancelKind, CancelReason, Time};
use crate::util::DetRng;
fn cancel_kind_to_u8(kind: CancelKind) -> u8 {
match kind {
CancelKind::User => 0,
CancelKind::Timeout => 1,
CancelKind::Deadline => 2,
CancelKind::PollQuota => 3,
CancelKind::CostBudget => 4,
CancelKind::FailFast => 5,
CancelKind::RaceLost => 6,
CancelKind::ParentCancelled => 7,
CancelKind::ResourceUnavailable => 8,
CancelKind::Shutdown => 9,
CancelKind::LinkedExit => 10,
}
}
fn cancel_kind_from_u8(b: u8) -> Option<CancelKind> {
match b {
0 => Some(CancelKind::User),
1 => Some(CancelKind::Timeout),
2 => Some(CancelKind::Deadline),
3 => Some(CancelKind::PollQuota),
4 => Some(CancelKind::CostBudget),
5 => Some(CancelKind::FailFast),
6 => Some(CancelKind::RaceLost),
7 => Some(CancelKind::ParentCancelled),
8 => Some(CancelKind::ResourceUnavailable),
9 => Some(CancelKind::Shutdown),
10 => Some(CancelKind::LinkedExit),
_ => None,
}
}
pub trait CancelListener: Send + Sync {
fn on_cancel(&self, reason: &CancelReason, at: Time);
}
impl<F> CancelListener for F
where
F: Fn(&CancelReason, Time) + Send + Sync,
{
fn on_cancel(&self, reason: &CancelReason, at: Time) {
self(reason, at);
}
}
struct CancelTokenState {
token_id: u64,
object_id: ObjectId,
cancelled: AtomicBool,
cancelled_at: AtomicU64,
reason: RwLock<Option<CancelReason>>,
cleanup_budget: Budget,
children: RwLock<SmallVec<[SymbolCancelToken; 2]>>,
listeners: RwLock<SmallVec<[ListenerEntry; 2]>>,
listener_panic_count: AtomicU64,
}
struct ListenerEntry {
listener: Box<dyn CancelListener>,
notified_severity: u8,
}
#[derive(Clone)]
pub struct SymbolCancelToken {
state: Arc<CancelTokenState>,
}
impl SymbolCancelToken {
#[must_use]
pub fn new(object_id: ObjectId, rng: &mut DetRng) -> Self {
Self {
state: Arc::new(CancelTokenState {
token_id: rng.next_u64(),
object_id,
cancelled: AtomicBool::new(false),
cancelled_at: AtomicU64::new(u64::MAX),
reason: RwLock::new(None),
cleanup_budget: Budget::default(),
children: RwLock::new(SmallVec::new()),
listeners: RwLock::new(SmallVec::new()),
listener_panic_count: AtomicU64::new(0),
}),
}
}
#[must_use]
pub fn with_budget(object_id: ObjectId, budget: Budget, rng: &mut DetRng) -> Self {
Self {
state: Arc::new(CancelTokenState {
token_id: rng.next_u64(),
object_id,
cancelled: AtomicBool::new(false),
cancelled_at: AtomicU64::new(u64::MAX),
reason: RwLock::new(None),
cleanup_budget: budget,
children: RwLock::new(SmallVec::new()),
listeners: RwLock::new(SmallVec::new()),
listener_panic_count: AtomicU64::new(0),
}),
}
}
#[must_use]
pub fn listener_panic_count(&self) -> u64 {
self.state.listener_panic_count.load(Ordering::Relaxed)
}
fn record_listener_panic(
state: &CancelTokenState,
panic_payload: Box<dyn std::any::Any + Send>,
) {
state.listener_panic_count.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing-integration")]
{
let _trace_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let panic_msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
};
tracing::warn!(
object_id = ?state.object_id,
token_id = state.token_id,
panic = %panic_msg,
"cancel listener panicked during on_cancel — caught and logged \
instead of silently swallowed (br-asupersync-mzamuo)"
);
}));
}
#[cfg(not(feature = "tracing-integration"))]
{
let _ = panic_payload;
}
}
fn notify_listener_with_panic_logging(
state: &CancelTokenState,
listener: &dyn CancelListener,
reason: &CancelReason,
now: Time,
) {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
listener.on_cancel(reason, now);
}));
if let Err(panic_payload) = result {
Self::record_listener_panic(state, panic_payload);
}
}
fn notify_owned_listener_with_panic_logging(
state: &CancelTokenState,
listener: Box<dyn CancelListener>,
reason: &CancelReason,
now: Time,
) {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
listener.on_cancel(reason, now);
drop(listener);
}));
if let Err(panic_payload) = result {
Self::record_listener_panic(state, panic_payload);
}
}
fn notify_retained_listeners_until_current(
state: &CancelTokenState,
target_reason: &CancelReason,
target_severity: u8,
force_target_notification: bool,
) {
let notify_at_nanos = state.cancelled_at.load(Ordering::Acquire);
let notify_at = if notify_at_nanos == u64::MAX {
Time::ZERO
} else {
Time::from_nanos(notify_at_nanos)
};
let mut retained = {
let mut listeners = state.listeners.write();
std::mem::take(&mut *listeners)
};
for entry in &mut retained {
if force_target_notification || entry.notified_severity < target_severity {
Self::notify_listener_with_panic_logging(
state,
entry.listener.as_ref(),
target_reason,
notify_at,
);
entry.notified_severity = target_severity;
}
}
const MAX_CATCH_UP_ITERATIONS: u32 = 8;
for iteration in 0..MAX_CATCH_UP_ITERATIONS {
let reason_guard = state.reason.write();
let Some(current_reason) = reason_guard.clone() else {
let mut listeners = state.listeners.write();
listeners.extend(retained);
return;
};
let current_severity = current_reason.kind.severity();
if retained
.iter()
.all(|entry| entry.notified_severity >= current_severity)
{
let mut listeners = state.listeners.write();
listeners.extend(retained);
return;
}
drop(reason_guard);
for entry in &mut retained {
if entry.notified_severity < current_severity {
Self::notify_listener_with_panic_logging(
state,
entry.listener.as_ref(),
¤t_reason,
notify_at,
);
entry.notified_severity = current_severity;
}
}
if iteration < MAX_CATCH_UP_ITERATIONS - 1 {
std::hint::spin_loop();
}
}
let final_reason = {
let reason_guard = state.reason.write();
reason_guard
.clone()
.unwrap_or_else(CancelReason::parent_cancelled)
};
let final_severity = final_reason.kind.severity();
for entry in &mut retained {
if entry.notified_severity < final_severity {
Self::notify_listener_with_panic_logging(
state,
entry.listener.as_ref(),
&final_reason,
notify_at,
);
entry.notified_severity = final_severity;
}
}
let mut listeners = state.listeners.write();
listeners.extend(retained);
}
#[inline]
#[must_use]
pub fn token_id(&self) -> u64 {
self.state.token_id
}
#[inline]
#[must_use]
pub fn object_id(&self) -> ObjectId {
self.state.object_id
}
#[inline]
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.state.cancelled.load(Ordering::Acquire)
}
#[must_use]
pub fn reason(&self) -> Option<CancelReason> {
self.state.reason.read().clone()
}
#[inline]
#[must_use]
pub fn cancelled_at(&self) -> Option<Time> {
let nanos = self.state.cancelled_at.load(Ordering::Acquire);
if nanos == u64::MAX {
if self.is_cancelled() {
let _guard = self.state.reason.read();
let nanos_sync = self.state.cancelled_at.load(Ordering::Acquire);
if nanos_sync == u64::MAX {
None } else {
Some(Time::from_nanos(nanos_sync))
}
} else {
None
}
} else {
Some(Time::from_nanos(nanos))
}
}
#[must_use]
pub fn cleanup_budget(&self) -> Budget {
self.state.cleanup_budget
}
fn parent_cancelled_with_cause(parent_reason: &CancelReason, at: Time) -> CancelReason {
CancelReason::parent_cancelled()
.with_timestamp(at)
.with_cause_limited(parent_reason.clone(), &CancelAttributionConfig::default())
}
fn parent_cascade_reason_at(&self, at: Time) -> CancelReason {
self.state.reason.read().as_ref().map_or_else(
|| CancelReason::parent_cancelled().with_timestamp(at),
|reason| Self::parent_cancelled_with_cause(reason, at),
)
}
#[allow(clippy::must_use_candidate)]
pub fn cancel(&self, reason: &CancelReason, now: Time) -> bool {
let mut reason_guard = self.state.reason.write();
if self
.state
.cancelled
.compare_exchange(false, true, Ordering::Release, Ordering::Acquire)
.is_ok()
{
let stored_nanos = now.as_nanos().min(u64::MAX - 1);
self.state
.cancelled_at
.store(stored_nanos, Ordering::Release);
*reason_guard = Some(reason.clone());
drop(reason_guard);
let new_severity = reason.kind.severity();
Self::notify_retained_listeners_until_current(&self.state, reason, new_severity, true);
let children = {
let mut children = self.state.children.write();
std::mem::take(&mut *children)
};
let parent_reason = self.parent_cascade_reason_at(now);
for child in children {
child.cancel(&parent_reason, now);
}
true
} else {
let prior_severity;
let strengthened_reason;
if let Some(ref mut stored) = *reason_guard {
prior_severity = stored.kind.severity();
stored.strengthen(reason);
strengthened_reason = stored.clone();
} else {
prior_severity = 0;
*reason_guard = Some(reason.clone());
strengthened_reason = reason.clone();
let stored_nanos = now.as_nanos().min(u64::MAX - 1);
self.state
.cancelled_at
.compare_exchange(u64::MAX, stored_nanos, Ordering::Release, Ordering::Relaxed)
.ok();
}
let new_severity = strengthened_reason.kind.severity();
drop(reason_guard);
if new_severity > prior_severity {
Self::notify_retained_listeners_until_current(
&self.state,
&strengthened_reason,
new_severity,
false,
);
}
false
}
}
fn cancelled_at_snapshot_for_child(&self) -> Option<Time> {
if !self.is_cancelled() {
return None;
}
const MAX_RETRIES: u32 = 1000;
for _attempt in 0..MAX_RETRIES {
let nanos = self.state.cancelled_at.load(Ordering::Acquire);
if nanos != u64::MAX {
return Some(Time::from_nanos(nanos));
}
if let Some(reason_guard) = self.state.reason.try_read() {
if reason_guard.is_none() {
return Some(Time::ZERO);
}
let synced = self.state.cancelled_at.load(Ordering::Acquire);
debug_assert_ne!(
synced,
u64::MAX,
"cancelled_at must be published before reason write lock is released"
);
return Some(if synced == u64::MAX {
Time::ZERO
} else {
Time::from_nanos(synced)
});
}
std::thread::sleep(std::time::Duration::from_nanos(100));
}
Some(Time::ZERO)
}
#[must_use]
pub fn child(&self, rng: &mut DetRng) -> Self {
let child = Self::new(self.state.object_id, rng);
let mut children = self.state.children.write();
if !self.state.cancelled.load(Ordering::Acquire) {
children.push(child.clone());
return child;
}
drop(children);
if let Some(at) = self.cancelled_at_snapshot_for_child() {
let parent_reason = self.parent_cascade_reason_at(at);
child.cancel(&parent_reason, at);
} else {
let mut children = self.state.children.write();
if !self.state.cancelled.load(Ordering::Acquire) {
children.push(child.clone());
} else {
drop(children);
let mut backoff_ms = 1;
for _ in 0..10 {
if let Some(at) = self.cancelled_at_snapshot_for_child() {
let parent_reason = self.parent_cascade_reason_at(at);
child.cancel(&parent_reason, at);
break;
}
std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
backoff_ms = (backoff_ms * 2).min(16);
}
}
}
child
}
pub fn add_listener(&self, listener: impl CancelListener + 'static) {
let reason_guard = self.state.reason.write();
let mut listeners = self.state.listeners.write();
if self.state.cancelled.load(Ordering::Acquire) {
let reason = reason_guard
.clone()
.unwrap_or_else(CancelReason::parent_cancelled);
let at_nanos = self.state.cancelled_at.load(Ordering::Acquire);
debug_assert!(
at_nanos != u64::MAX || reason_guard.is_none(),
"add_listener must not observe reason=Some(_) with unpublished cancelled_at"
);
let at = if at_nanos == u64::MAX {
Time::ZERO
} else {
Time::from_nanos(at_nanos)
};
drop(listeners);
drop(reason_guard);
let boxed: Box<dyn CancelListener> = Box::new(listener);
Self::notify_owned_listener_with_panic_logging(&self.state, boxed, &reason, at);
} else {
listeners.push(ListenerEntry {
listener: Box::new(listener),
notified_severity: 0,
});
drop(listeners);
drop(reason_guard);
}
}
#[must_use]
pub fn to_bytes(&self) -> [u8; TOKEN_WIRE_SIZE] {
let mut buf = [0u8; TOKEN_WIRE_SIZE];
buf[0..8].copy_from_slice(&self.state.token_id.to_be_bytes());
buf[8..16].copy_from_slice(&self.state.object_id.high().to_be_bytes());
buf[16..24].copy_from_slice(&self.state.object_id.low().to_be_bytes());
buf[24] = u8::from(self.is_cancelled());
buf
}
#[must_use]
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() < TOKEN_WIRE_SIZE {
return None;
}
let token_id = u64::from_be_bytes(data[0..8].try_into().ok()?);
let high = u64::from_be_bytes(data[8..16].try_into().ok()?);
let low = u64::from_be_bytes(data[16..24].try_into().ok()?);
let cancelled = data[24] != 0;
Some(Self {
state: Arc::new(CancelTokenState {
token_id,
object_id: ObjectId::new(high, low),
cancelled: AtomicBool::new(cancelled),
cancelled_at: AtomicU64::new(u64::MAX),
reason: RwLock::new(None),
cleanup_budget: Budget::default(),
children: RwLock::new(SmallVec::new()),
listeners: RwLock::new(SmallVec::new()),
listener_panic_count: AtomicU64::new(0),
}),
})
}
#[doc(hidden)]
#[must_use]
#[cfg(test)]
pub fn new_for_test(token_id: u64, object_id: ObjectId) -> Self {
Self {
state: Arc::new(CancelTokenState {
token_id,
object_id,
cancelled: AtomicBool::new(false),
cancelled_at: AtomicU64::new(u64::MAX),
reason: RwLock::new(None),
cleanup_budget: Budget::default(),
children: RwLock::new(SmallVec::new()),
listeners: RwLock::new(SmallVec::new()),
listener_panic_count: AtomicU64::new(0),
}),
}
}
}
impl fmt::Debug for SymbolCancelToken {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SymbolCancelToken")
.field("token_id", &format!("{:016x}", self.state.token_id))
.field("object_id", &self.state.object_id)
.field("cancelled", &self.is_cancelled())
.finish()
}
}
const TOKEN_WIRE_SIZE: usize = 25;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CancelMessage {
token_id: u64,
object_id: ObjectId,
kind: CancelKind,
initiated_at: Time,
sequence: u64,
hops: u8,
max_hops: u8,
}
const MESSAGE_WIRE_SIZE: usize = 43;
impl CancelMessage {
#[must_use]
pub fn new(
token_id: u64,
object_id: ObjectId,
kind: CancelKind,
initiated_at: Time,
sequence: u64,
) -> Self {
Self {
token_id,
object_id,
kind,
initiated_at,
sequence,
hops: 0,
max_hops: 10,
}
}
#[inline]
#[must_use]
pub const fn token_id(&self) -> u64 {
self.token_id
}
#[inline]
#[must_use]
pub const fn object_id(&self) -> ObjectId {
self.object_id
}
#[inline]
#[must_use]
pub const fn kind(&self) -> CancelKind {
self.kind
}
#[inline]
#[must_use]
pub const fn initiated_at(&self) -> Time {
self.initiated_at
}
#[inline]
#[must_use]
pub const fn sequence(&self) -> u64 {
self.sequence
}
#[inline]
#[must_use]
pub const fn hops(&self) -> u8 {
self.hops
}
#[inline]
#[must_use]
pub const fn can_forward(&self) -> bool {
self.hops < self.max_hops
}
#[must_use]
pub fn forwarded(&self) -> Option<Self> {
if !self.can_forward() {
return None;
}
Some(Self {
hops: self.hops + 1,
..self.clone()
})
}
#[inline]
#[must_use]
pub const fn with_max_hops(mut self, max: u8) -> Self {
self.max_hops = max;
self
}
#[must_use]
pub fn to_bytes(&self) -> [u8; MESSAGE_WIRE_SIZE] {
let mut buf = [0u8; MESSAGE_WIRE_SIZE];
buf[0..8].copy_from_slice(&self.token_id.to_be_bytes());
buf[8..16].copy_from_slice(&self.object_id.high().to_be_bytes());
buf[16..24].copy_from_slice(&self.object_id.low().to_be_bytes());
buf[24] = cancel_kind_to_u8(self.kind);
buf[25..33].copy_from_slice(&self.initiated_at.as_nanos().to_be_bytes());
buf[33..41].copy_from_slice(&self.sequence.to_be_bytes());
buf[41] = self.hops;
buf[42] = self.max_hops;
buf
}
#[must_use]
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() < MESSAGE_WIRE_SIZE {
return None;
}
let token_id = u64::from_be_bytes(data[0..8].try_into().ok()?);
let high = u64::from_be_bytes(data[8..16].try_into().ok()?);
let low = u64::from_be_bytes(data[16..24].try_into().ok()?);
let kind = cancel_kind_from_u8(data[24])?;
let initiated_at = Time::from_nanos(u64::from_be_bytes(data[25..33].try_into().ok()?));
let sequence = u64::from_be_bytes(data[33..41].try_into().ok()?);
let hops = data[41];
let max_hops = data[42];
Some(Self {
token_id,
object_id: ObjectId::new(high, low),
kind,
initiated_at,
sequence,
hops,
max_hops,
})
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PeerId(String);
impl PeerId {
#[inline]
#[must_use]
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
#[inline]
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
pub trait CancelSink: Send + Sync {
fn send_to(
&self,
peer: &PeerId,
msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<()>> + Send;
fn broadcast(
&self,
msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<usize>> + Send;
}
#[derive(Clone, Debug, Default)]
pub struct CancelBroadcastMetrics {
pub initiated: u64,
pub received: u64,
pub forwarded: u64,
pub duplicates: u64,
pub max_hops_reached: u64,
pub pending_retries: u64,
}
pub struct CancelBroadcaster<S: CancelSink> {
peers: RwLock<SmallVec<[PeerId; 4]>>,
active_tokens: RwLock<HashMap<ObjectId, SymbolCancelToken>>,
seen_sequences: RwLock<SeenSequences>,
max_seen: usize,
sink: S,
next_sequence: AtomicU64,
pending_retries: RwLock<VecDeque<CancelMessage>>,
retry_in_progress: AtomicBool,
sender_tag: u64,
initiated: AtomicU64,
received: AtomicU64,
forwarded: AtomicU64,
duplicates: AtomicU64,
max_hops_reached: AtomicU64,
}
type SeenKey = (ObjectId, u64, u64);
#[derive(Debug, Default)]
struct SeenSequences {
set: HashSet<SeenKey>,
order: VecDeque<SeenKey>,
}
impl SeenSequences {
fn insert(&mut self, key: SeenKey) -> bool {
if self.set.insert(key) {
self.order.push_back(key);
true
} else {
false
}
}
fn remove_oldest(&mut self) -> Option<SeenKey> {
let oldest = self.order.pop_front()?;
self.set.remove(&oldest);
Some(oldest)
}
}
impl<S: CancelSink> CancelBroadcaster<S> {
pub fn new(sink: S) -> Self {
let mut tag_buf = [0u8; 8];
getrandom::fill(&mut tag_buf).expect("OS entropy source unavailable");
let sender_tag = u64::from_ne_bytes(tag_buf);
Self {
peers: RwLock::new(SmallVec::new()),
active_tokens: RwLock::new(HashMap::new()),
seen_sequences: RwLock::new(SeenSequences::default()),
max_seen: 10_000,
sink,
next_sequence: AtomicU64::new(0),
sender_tag,
pending_retries: RwLock::new(VecDeque::new()),
retry_in_progress: AtomicBool::new(false),
initiated: AtomicU64::new(0),
received: AtomicU64::new(0),
forwarded: AtomicU64::new(0),
duplicates: AtomicU64::new(0),
max_hops_reached: AtomicU64::new(0),
}
}
pub fn add_peer(&self, peer: PeerId) {
let mut peers = self.peers.write();
if !peers.contains(&peer) {
peers.push(peer);
}
}
pub fn remove_peer(&self, peer: &PeerId) {
self.peers.write().retain(|p| p != peer);
}
pub fn register_token(&self, token: SymbolCancelToken) {
self.active_tokens.write().insert(token.object_id(), token);
}
pub fn unregister_token(&self, object_id: &ObjectId) {
self.active_tokens.write().remove(object_id);
}
pub fn prepare_cancel(
&self,
object_id: ObjectId,
reason: &CancelReason,
now: Time,
) -> CancelMessage {
let (token, token_id) = {
let tokens = self.active_tokens.read();
tokens.get(&object_id).map_or_else(
|| (None, self.sender_tag ^ object_id.high() ^ object_id.low()),
|token| (Some(token.clone()), token.token_id()),
)
};
if let Some(token) = token {
token.cancel(reason, now);
}
let sequence = self.next_sequence.fetch_add(1, Ordering::Relaxed);
let msg = CancelMessage::new(token_id, object_id, reason.kind(), now, sequence);
self.mark_seen(object_id, msg.token_id(), sequence);
self.initiated.fetch_add(1, Ordering::Relaxed);
msg
}
pub fn receive_message(
&self,
msg: &CancelMessage,
_received_at: Time,
) -> Option<CancelMessage> {
if self.is_seen(msg.object_id(), msg.token_id(), msg.sequence()) {
self.duplicates.fetch_add(1, Ordering::Relaxed);
return None;
}
self.mark_seen(msg.object_id(), msg.token_id(), msg.sequence());
self.received.fetch_add(1, Ordering::Relaxed);
let token = self.active_tokens.read().get(&msg.object_id()).cloned(); if let Some(token) = token {
let reason = CancelReason::new(msg.kind()).with_timestamp(msg.initiated_at());
token.cancel(&reason, msg.initiated_at());
}
msg.forwarded().map_or_else(
|| {
self.max_hops_reached.fetch_add(1, Ordering::Relaxed);
None
},
|forwarded| {
self.forwarded.fetch_add(1, Ordering::Relaxed);
Some(forwarded)
},
)
}
pub async fn cancel(
&self,
object_id: ObjectId,
reason: &CancelReason,
now: Time,
) -> crate::error::Result<usize> {
let msg = self.prepare_cancel(object_id, reason, now);
match self.sink.broadcast(&msg).await {
Ok(count) => Ok(count),
Err(err) => {
self.pending_retries.write().push_back(msg);
Err(err)
}
}
}
pub async fn handle_message(&self, msg: CancelMessage, now: Time) -> crate::error::Result<()> {
if let Some(forwarded) = self.receive_message(&msg, now) {
match self.sink.broadcast(&forwarded).await {
Ok(_) => Ok(()),
Err(err) => {
self.pending_retries.write().push_back(forwarded);
Err(err)
}
}
} else {
Ok(())
}
}
pub async fn retry_failed_broadcasts(&self) -> (usize, Option<crate::error::Error>) {
struct RetryGuard<'a>(&'a AtomicBool);
impl Drop for RetryGuard<'_> {
fn drop(&mut self) {
self.0.store(false, Ordering::Release);
}
}
if self
.retry_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return (0, None);
}
let _retry_guard = RetryGuard(&self.retry_in_progress);
let mut retried_count = 0;
let mut last_error = None;
loop {
let (msg, original_queue_len) = {
let mut retries = self.pending_retries.write();
let msg = retries.pop_front();
let queue_len = retries.len();
(msg, queue_len)
};
let Some(msg) = msg else {
break; };
match self.sink.broadcast(&msg).await {
Ok(_) => {
retried_count += 1;
}
Err(err) => {
{
let mut retries = self.pending_retries.write();
let current_len = retries.len();
if current_len > original_queue_len {
retries.insert(original_queue_len, msg);
} else {
retries.push_front(msg);
}
}
last_error = Some(err);
break; }
}
}
(retried_count, last_error)
}
#[must_use]
pub fn metrics(&self) -> CancelBroadcastMetrics {
CancelBroadcastMetrics {
initiated: self.initiated.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
forwarded: self.forwarded.load(Ordering::Relaxed),
duplicates: self.duplicates.load(Ordering::Relaxed),
max_hops_reached: self.max_hops_reached.load(Ordering::Relaxed),
pending_retries: self.pending_retries.read().len() as u64,
}
}
fn is_seen(&self, object_id: ObjectId, token_id: u64, sequence: u64) -> bool {
self.seen_sequences
.read()
.set
.contains(&(object_id, token_id, sequence))
}
fn mark_seen(&self, object_id: ObjectId, token_id: u64, sequence: u64) {
let mut seen = self.seen_sequences.write();
if seen.set.contains(&(object_id, token_id, sequence)) {
return;
}
while seen.set.len() >= self.max_seen {
if seen.remove_oldest().is_none() {
break;
}
}
seen.insert((object_id, token_id, sequence));
}
}
pub trait CleanupHandler: Send + Sync {
#[allow(clippy::result_large_err)]
fn cleanup(&self, object_id: ObjectId, symbols: Vec<Symbol>) -> crate::error::Result<usize>;
fn name(&self) -> &'static str;
}
#[derive(Clone)]
struct PendingSymbolSet {
symbols: Vec<Symbol>,
total_bytes: usize,
_created_at: Time,
}
#[derive(Clone, Debug)]
pub struct CleanupResult {
pub object_id: ObjectId,
pub symbols_cleaned: usize,
pub bytes_freed: usize,
pub within_budget: bool,
pub completed: bool,
pub handlers_run: Vec<String>,
pub handler_errors: Vec<String>,
}
#[derive(Clone, Debug, Default)]
pub struct CleanupStats {
pub pending_objects: usize,
pub pending_symbols: usize,
pub pending_bytes: usize,
}
struct ActiveCleanupGuard<'a> {
object_id: ObjectId,
active: &'a RwLock<HashSet<ObjectId>>,
}
impl Drop for ActiveCleanupGuard<'_> {
fn drop(&mut self) {
self.active.write().remove(&self.object_id);
}
}
pub struct CleanupCoordinator {
pending: RwLock<HashMap<ObjectId, PendingSymbolSet>>,
handlers: RwLock<HashMap<ObjectId, Box<dyn CleanupHandler>>>,
completed: RwLock<HashSet<ObjectId>>,
cleanup_buffer: RwLock<HashMap<ObjectId, Vec<Symbol>>>,
cleanup_active: RwLock<HashSet<ObjectId>>,
default_budget: Budget,
}
impl CleanupCoordinator {
#[must_use]
pub fn new() -> Self {
Self {
pending: RwLock::new(HashMap::new()),
handlers: RwLock::new(HashMap::new()),
completed: RwLock::new(HashSet::new()),
cleanup_buffer: RwLock::new(HashMap::new()),
cleanup_active: RwLock::new(HashSet::new()),
default_budget: Budget::new().with_poll_quota(1000),
}
}
#[must_use]
pub fn with_default_budget(mut self, budget: Budget) -> Self {
self.default_budget = budget;
self
}
#[allow(clippy::significant_drop_tightening)]
pub fn register_pending(&self, object_id: ObjectId, symbol: Symbol, now: Time) {
let mut pending = self.pending.write();
if self.completed.read().contains(&object_id) {
return;
}
let mut cleanup_buffer = self.cleanup_buffer.write();
if cleanup_buffer.contains_key(&object_id) {
cleanup_buffer.entry(object_id).or_default().push(symbol);
return;
}
drop(cleanup_buffer);
let set = pending
.entry(object_id)
.or_insert_with(|| PendingSymbolSet {
symbols: Vec::new(),
total_bytes: 0,
_created_at: now,
});
set.total_bytes = set.total_bytes.saturating_add(symbol.len());
set.symbols.push(symbol);
}
#[allow(clippy::significant_drop_tightening)]
fn restore_retry_state(
&self,
object_id: ObjectId,
handler: Box<dyn CleanupHandler>,
mut pending_set: PendingSymbolSet,
) {
let mut handlers = self.handlers.write();
let mut pending = self.pending.write();
let mut completed = self.completed.write();
if completed.contains(&object_id) {
self.cleanup_buffer.write().remove(&object_id);
return;
}
handlers.insert(object_id, handler);
let mut cleanup_buffer = self.cleanup_buffer.write();
if let Some(buffered_symbols) = cleanup_buffer.remove(&object_id) {
for symbol in buffered_symbols {
pending_set.total_bytes = pending_set.total_bytes.saturating_add(symbol.len());
pending_set.symbols.push(symbol);
}
}
pending.insert(object_id, pending_set);
completed.remove(&object_id);
}
#[allow(clippy::significant_drop_tightening)]
fn restore_pending_only_state(&self, object_id: ObjectId, mut pending_set: PendingSymbolSet) {
let mut pending = self.pending.write();
let mut completed = self.completed.write();
if completed.contains(&object_id) {
self.cleanup_buffer.write().remove(&object_id);
return;
}
let mut cleanup_buffer = self.cleanup_buffer.write();
if let Some(buffered_symbols) = cleanup_buffer.remove(&object_id) {
for symbol in buffered_symbols {
pending_set.total_bytes = pending_set.total_bytes.saturating_add(symbol.len());
pending_set.symbols.push(symbol);
}
}
pending.insert(object_id, pending_set);
completed.remove(&object_id);
}
pub fn register_handler(&self, object_id: ObjectId, handler: impl CleanupHandler + 'static) {
self.handlers.write().insert(object_id, Box::new(handler));
}
#[inline]
fn empty_pending_set() -> PendingSymbolSet {
PendingSymbolSet {
symbols: Vec::new(),
total_bytes: 0,
_created_at: Time::ZERO,
}
}
pub fn clear_pending(&self, object_id: &ObjectId) -> Option<usize> {
self.handlers.write().remove(object_id);
let mut pending = self.pending.write();
self.completed.write().insert(*object_id);
pending.remove(object_id).map(|set| set.symbols.len())
}
pub fn cleanup(&self, object_id: ObjectId, budget: Option<Budget>) -> CleanupResult {
let budget = budget.unwrap_or(self.default_budget);
let mut result = CleanupResult {
object_id,
symbols_cleaned: 0,
bytes_freed: 0,
within_budget: true,
completed: true,
handlers_run: Vec::new(),
handler_errors: Vec::new(),
};
let _active_guard = {
let mut active = self.cleanup_active.write();
if !active.insert(object_id) {
result.completed = false;
result.handler_errors.push(format!(
"cleanup already in progress for object {object_id:?}; \
rejecting reentrant cleanup attempt (br-asupersync-a19xwn)"
));
return result;
}
ActiveCleanupGuard {
object_id,
active: &self.cleanup_active,
}
};
self.cleanup_buffer.write().entry(object_id).or_default();
let handler = { self.handlers.write().remove(&object_id) };
let pending_set = { self.pending.write().remove(&object_id) };
let had_handler = handler.is_some();
if let Some(set) = pending_set {
let symbol_count = set.symbols.len();
let total_bytes = set.total_bytes;
if let Some(handler) = handler {
if budget.poll_quota == 0 {
self.restore_retry_state(object_id, handler, set);
result.within_budget = false;
result.completed = false;
} else {
let handler_name = handler.name().to_string();
let retry_set = set.clone();
result.handlers_run.push(handler_name.clone());
match handler.cleanup(object_id, set.symbols) {
Ok(_) => {
self.completed.write().insert(object_id);
self.cleanup_buffer.write().remove(&object_id);
result.symbols_cleaned = symbol_count;
result.bytes_freed = total_bytes;
}
Err(err) => {
self.restore_retry_state(object_id, handler, retry_set);
result.completed = false;
result.handler_errors.push(format!("{handler_name}: {err}"));
}
}
}
} else {
result.completed = false;
result.handler_errors.push(format!(
"no cleanup handler registered for object {object_id:?}; \
{symbol_count} symbol(s) / {total_bytes} byte(s) deferred \
(br-asupersync-batcyw)"
));
let mut cleanup_buffer = self.cleanup_buffer.write();
let mut restored_set = set;
if let Some(buffered_symbols) = cleanup_buffer.remove(&object_id) {
for symbol in buffered_symbols {
restored_set.total_bytes =
restored_set.total_bytes.saturating_add(symbol.len());
restored_set.symbols.push(symbol);
}
}
drop(cleanup_buffer);
self.pending.write().insert(object_id, restored_set);
}
} else {
let buffered_symbol_count = self
.cleanup_buffer
.read()
.get(&object_id)
.map_or(0, Vec::len);
if buffered_symbol_count > 0 {
let new_set = Self::empty_pending_set();
if let Some(handler) = handler {
self.restore_retry_state(object_id, handler, new_set);
} else {
self.restore_pending_only_state(object_id, new_set);
}
result.completed = false; } else {
self.cleanup_buffer.write().remove(&object_id);
}
if result.completed && had_handler {
self.completed.write().insert(object_id);
}
}
if result.completed {
self.handlers.write().remove(&object_id);
}
result
}
#[must_use]
pub fn stats(&self) -> CleanupStats {
let pending = self.pending.read();
let mut total_symbols = 0;
let mut total_bytes = 0;
for set in pending.values() {
total_symbols += set.symbols.len();
total_bytes += set.total_bytes;
}
CleanupStats {
pending_objects: pending.len(),
pending_symbols: total_symbols,
pending_bytes: total_bytes,
}
}
}
impl Default for CleanupCoordinator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::runtime::yield_now;
use crate::test_utils::init_test_logging;
use crate::types::symbol::{ObjectId, Symbol};
use serde_json::Value;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicUsize;
struct CountingCleanupHandler;
impl CleanupHandler for CountingCleanupHandler {
fn cleanup(
&self,
_object_id: ObjectId,
symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
Ok(symbols.len())
}
fn name(&self) -> &'static str {
"counting"
}
}
struct NullSink;
impl CancelSink for NullSink {
fn send_to(
&self,
_peer: &PeerId,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<()>> + Send {
std::future::ready(Ok(()))
}
fn broadcast(
&self,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<usize>> + Send {
std::future::ready(Ok(0))
}
}
struct RecordingSink {
label: &'static str,
checkpoints: Arc<StdMutex<Vec<Value>>>,
messages: Arc<StdMutex<Vec<CancelMessage>>>,
}
#[derive(Debug, PartialEq, Eq)]
struct TokenSnapshot {
token_id: u64,
cancelled: bool,
reason_kind: Option<CancelKind>,
cancelled_at_nanos: Option<u64>,
queued_children: usize,
queued_listeners: usize,
}
fn snapshot_token(token: &SymbolCancelToken) -> TokenSnapshot {
TokenSnapshot {
token_id: token.token_id(),
cancelled: token.is_cancelled(),
reason_kind: token.reason().map(|reason| reason.kind),
cancelled_at_nanos: token.cancelled_at().map(Time::as_nanos),
queued_children: token.state.children.read().len(),
queued_listeners: token.state.listeners.read().len(),
}
}
fn attach_order_listener(token: &SymbolCancelToken, order: &Arc<StdMutex<Vec<u64>>>) {
let token_id = token.token_id();
let order = Arc::clone(order);
token.add_listener(move |_: &CancelReason, _: Time| {
order.lock().unwrap().push(token_id); });
}
fn attach_named_order_listener(
token: &SymbolCancelToken,
label: &'static str,
order: &Arc<StdMutex<Vec<&'static str>>>,
) {
let order = Arc::clone(order);
token.add_listener(move |_: &CancelReason, _: Time| {
order.lock().unwrap().push(label);
});
}
#[derive(Debug, PartialEq, Eq)]
struct ReasonSnapshot {
cancelled: bool,
kind: Option<CancelKind>,
cancelled_at_nanos: Option<u64>,
cause_chain: Vec<CancelKind>,
}
fn snapshot_reason(token: &SymbolCancelToken) -> ReasonSnapshot {
let reason = token.reason();
let cause_chain = reason
.as_ref()
.map(|reason| reason.chain().map(|reason| reason.kind).collect())
.unwrap_or_default();
ReasonSnapshot {
cancelled: token.is_cancelled(),
kind: reason.as_ref().map(|reason| reason.kind),
cancelled_at_nanos: token.cancelled_at().map(Time::as_nanos),
cause_chain,
}
}
fn reason_chain_kinds(token: &SymbolCancelToken) -> Vec<CancelKind> {
token
.reason()
.map(|reason| reason.chain().map(|reason| reason.kind).collect())
.unwrap_or_default()
}
fn observable_token_state_json(token: &SymbolCancelToken) -> Value {
serde_json::json!({
"cancelled": token.is_cancelled(),
"cancelled_at_nanos": token.cancelled_at().map(Time::as_nanos),
"queued_children": token.state.children.read().len(),
"queued_listeners": token.state.listeners.read().len(),
"reason_kind": token.reason().map(|reason| format!("{:?}", reason.kind)),
})
}
#[derive(Debug, PartialEq, Eq)]
struct DescendantInvariantScenario {
creation_order: Vec<&'static str>,
observed_order: Vec<&'static str>,
left_before_parent: ReasonSnapshot,
left_after_parent: ReasonSnapshot,
right_child_after_parent: ReasonSnapshot,
right_leaf_after_parent: ReasonSnapshot,
}
fn run_descendant_invariant_scenario(
swap_creation_order: bool,
drop_right_child_handle: bool,
) -> DescendantInvariantScenario {
let mut rng = DetRng::new(0xCACE_1001);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(77), &mut rng);
let order = Arc::new(StdMutex::new(Vec::<&'static str>::new()));
let creation_order = if swap_creation_order {
vec!["right", "left"]
} else {
vec!["left", "right"]
};
let mut left_child: Option<SymbolCancelToken> = None;
let mut left_leaf: Option<SymbolCancelToken> = None;
let mut right_child: Option<SymbolCancelToken> = None;
let mut right_leaf: Option<SymbolCancelToken> = None;
for label in &creation_order {
let child = parent.child(&mut rng);
attach_named_order_listener(&child, label, &order);
let leaf = child.child(&mut rng);
match *label {
"left" => {
left_child = Some(child);
left_leaf = Some(leaf);
}
"right" => {
right_child = Some(child);
right_leaf = Some(leaf);
}
_ => unreachable!("unexpected branch label"),
}
}
let left_leaf = left_leaf.expect("left leaf should be created");
let right_leaf_observer = right_leaf.expect("right leaf should be created");
let right_child_observer = right_child
.as_ref()
.expect("right child should be created")
.clone();
let descendant_reason = CancelReason::shutdown()
.with_cause(CancelReason::timeout().with_cause(CancelReason::user("left-root-cause")));
let descendant_at = Time::from_millis(15);
assert!(left_leaf.cancel(&descendant_reason, descendant_at));
let left_before_parent = snapshot_reason(&left_leaf);
if drop_right_child_handle {
drop(right_child.take());
}
drop(left_child);
assert!(parent.cancel(&CancelReason::user("parent-cascade"), Time::from_millis(30)));
DescendantInvariantScenario {
creation_order,
observed_order: order.lock().unwrap().clone(),
left_before_parent,
left_after_parent: snapshot_reason(&left_leaf),
right_child_after_parent: snapshot_reason(&right_child_observer),
right_leaf_after_parent: snapshot_reason(&right_leaf_observer),
}
}
impl CancelSink for RecordingSink {
fn send_to(
&self,
_peer: &PeerId,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<()>> + Send {
std::future::ready(Ok(()))
}
fn broadcast(
&self,
msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<usize>> + Send {
let label = self.label;
let checkpoints = Arc::clone(&self.checkpoints);
let messages = Arc::clone(&self.messages);
let message = msg.clone();
async move {
let event = serde_json::json!({
"phase": format!("{label}_broadcast"),
"kind": format!("{:?}", message.kind()),
"sequence": message.sequence(),
"hops": message.hops(),
});
tracing::info!(event = %event, "symbol_cancel_lab_checkpoint");
{
checkpoints.lock().unwrap().push(event);
messages.lock().unwrap().push(message); } yield_now().await;
Ok(1)
}
}
}
#[test]
fn test_token_creation() {
let mut rng = DetRng::new(42);
let obj = ObjectId::new_for_test(1);
let cancel_handle = SymbolCancelToken::new(obj, &mut rng);
assert_eq!(cancel_handle.object_id(), obj);
assert!(!cancel_handle.is_cancelled());
assert!(cancel_handle.reason().is_none());
assert!(cancel_handle.cancelled_at().is_none());
}
#[test]
fn test_new_for_test_is_a_forgery_primitive_and_must_be_gated_wm9h2a() {
let object_id = ObjectId::new_for_test(0xdead_beef);
let forged_a = SymbolCancelToken::new_for_test(0x1111_2222_3333_4444, object_id);
let forged_b = SymbolCancelToken::new_for_test(0x1111_2222_3333_4444, object_id);
assert_eq!(forged_a.object_id(), object_id);
assert_eq!(forged_b.object_id(), object_id);
forged_a.cancel(&CancelReason::user("forgery-A"), Time::from_millis(1));
assert!(forged_a.is_cancelled());
assert!(
!forged_b.is_cancelled(),
"two new_for_test tokens with the same id must not share state — \
this confirms the constructor is a forgery primitive that MUST \
stay gated behind test or test-internals"
);
}
#[test]
fn test_token_cancel_once() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let now = Time::from_millis(100);
let reason = CancelReason::user("test");
assert!(cancel_handle.cancel(&reason, now));
assert!(cancel_handle.is_cancelled());
assert_eq!(cancel_handle.reason().unwrap().kind, CancelKind::User);
assert_eq!(cancel_handle.cancelled_at(), Some(now));
assert!(!cancel_handle.cancel(&CancelReason::timeout(), Time::from_millis(200)));
assert_eq!(cancel_handle.reason().unwrap().kind, CancelKind::Timeout);
}
#[test]
fn test_token_cancel_clamps_time_max_away_from_sentinel() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
assert!(cancel_handle.cancel(&CancelReason::timeout(), Time::MAX));
assert!(cancel_handle.is_cancelled());
assert_eq!(cancel_handle.reason().unwrap().kind, CancelKind::Timeout);
assert_eq!(
cancel_handle.cancelled_at(),
Some(Time::from_nanos(u64::MAX - 1))
);
}
#[test]
fn test_token_reason_propagates() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let reason = CancelReason::timeout().with_message("timed out");
cancel_handle.cancel(&reason, Time::from_millis(500));
let stored = cancel_handle.reason().unwrap();
assert_eq!(stored.kind, CancelKind::Timeout);
assert_eq!(stored.message, Some("timed out".to_string()));
}
#[test]
fn test_token_child_inherits_cancellation() {
let mut rng = DetRng::new(42);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let child = parent.child(&mut rng);
assert!(!child.is_cancelled());
parent.cancel(&CancelReason::user("test"), Time::from_millis(100));
assert!(child.is_cancelled());
assert_eq!(child.reason().unwrap().kind, CancelKind::ParentCancelled);
assert_eq!(
reason_chain_kinds(&child),
vec![CancelKind::ParentCancelled, CancelKind::User],
"child cancellation should carry the root parent reason as a cause"
);
}
#[test]
fn test_token_listener_notified() {
use std::sync::atomic::{AtomicBool, Ordering};
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let notified = Arc::new(AtomicBool::new(false));
let notified_clone = notified.clone();
cancel_handle.add_listener(move |_reason: &CancelReason, _at: Time| {
notified_clone.store(true, Ordering::SeqCst);
});
assert!(!notified.load(Ordering::SeqCst));
cancel_handle.cancel(&CancelReason::user("test"), Time::from_millis(100));
assert!(notified.load(Ordering::SeqCst));
}
#[test]
fn metamorphic_descendant_cancellation_observable_under_reorder_and_drop() {
let baseline = run_descendant_invariant_scenario(false, false);
let swapped = run_descendant_invariant_scenario(true, false);
let dropped = run_descendant_invariant_scenario(false, true);
for scenario in [&baseline, &swapped, &dropped] {
assert_eq!(
scenario.observed_order, scenario.creation_order,
"sibling cancellation listener order should follow child registration order"
);
assert_eq!(
scenario.left_before_parent, scenario.left_after_parent,
"a self-cancelled descendant must remain observable with the same cause chain after parent cancellation"
);
assert_eq!(
scenario.right_child_after_parent.kind,
Some(CancelKind::ParentCancelled),
"uncancelled sibling should be cancelled by the parent cascade"
);
assert_eq!(
scenario.right_leaf_after_parent.kind,
Some(CancelKind::ParentCancelled),
"grandchild under the uncancelled sibling should inherit parent cancellation"
);
assert_eq!(
scenario.right_child_after_parent.cause_chain,
vec![CancelKind::ParentCancelled, CancelKind::User],
"sibling child should retain the parent cancellation as its cause"
);
assert_eq!(
scenario.right_leaf_after_parent.cause_chain,
vec![
CancelKind::ParentCancelled,
CancelKind::ParentCancelled,
CancelKind::User,
],
"dropped-handle descendant should preserve the full parent-cancelled cause chain"
);
}
assert_eq!(
baseline.left_after_parent.kind,
Some(CancelKind::Shutdown),
"the stronger descendant cancellation should not be weakened by a later parent cascade"
);
assert_eq!(
baseline.left_after_parent.cause_chain,
vec![CancelKind::Shutdown, CancelKind::Timeout, CancelKind::User],
"descendant cause chain should remain intact"
);
assert_eq!(
baseline.left_after_parent, swapped.left_after_parent,
"sibling creation order should not change descendant observability"
);
assert_eq!(
baseline.left_after_parent, dropped.left_after_parent,
"dropping a sibling handle must not corrupt an already-cancelled descendant"
);
assert_eq!(
baseline.right_child_after_parent, swapped.right_child_after_parent,
"sibling reordering should not change cascade outcome"
);
assert_eq!(
baseline.right_child_after_parent, dropped.right_child_after_parent,
"dropping the sibling handle must preserve child cancellation outcome"
);
assert_eq!(
baseline.right_leaf_after_parent, swapped.right_leaf_after_parent,
"sibling reordering should not change leaf cascade outcome"
);
assert_eq!(
baseline.right_leaf_after_parent, dropped.right_leaf_after_parent,
"dropping the sibling handle must preserve descendant cascade outcome"
);
}
#[test]
fn test_token_serialization() {
let mut rng = DetRng::new(42);
let obj = ObjectId::new(0x1234_5678_9abc_def0, 0xfedc_ba98_7654_3210);
let cancel_handle = SymbolCancelToken::new(obj, &mut rng);
let bytes = cancel_handle.to_bytes();
assert_eq!(bytes.len(), TOKEN_WIRE_SIZE);
let parsed = SymbolCancelToken::from_bytes(&bytes).unwrap();
assert_eq!(parsed.token_id(), cancel_handle.token_id());
assert_eq!(parsed.object_id(), cancel_handle.object_id());
assert!(!parsed.is_cancelled());
}
#[test]
fn test_token_cancel_sets_reason_when_already_cancelled() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
cancel_handle.cancel(&CancelReason::user("initial"), Time::from_millis(100));
let parsed = SymbolCancelToken::from_bytes(&cancel_handle.to_bytes()).unwrap();
assert!(parsed.is_cancelled());
assert!(parsed.reason().is_none());
let reason = CancelReason::timeout();
assert!(!parsed.cancel(&reason, Time::from_millis(200)));
assert_eq!(parsed.reason().unwrap().kind, CancelKind::Timeout);
}
#[test]
fn test_cancel_token_transition_serialization_golden() {
let mut rng = DetRng::new(0x1337_beef_cafe_dead);
let scenarios = vec![
("fresh_token", {
let obj = ObjectId::new(0x1111_2222_3333_4444, 0x5555_6666_7777_8888);
SymbolCancelToken::new(obj, &mut rng)
}),
("cancelled_token", {
let obj = ObjectId::new(0xaaaa_bbbb_cccc_dddd, 0xeeee_ffff_0000_1111);
let token = SymbolCancelToken::new(obj, &mut rng);
token.cancel(
&CancelReason::timeout(),
crate::types::Time::from_millis(1000),
);
token
}),
("test_token_minimal", {
SymbolCancelToken::new_for_test(0x1234_5678_9abc_def0, ObjectId::new(0x0, 0x1))
}),
("test_token_max_values", {
let token = SymbolCancelToken::new_for_test(
0xffff_ffff_ffff_ffff,
ObjectId::new(0xdead_beef_cafe_babe, 0x1337_1337_1337_1337),
);
token.cancel(
&CancelReason::user("test"),
crate::types::Time::from_millis(9999),
);
token
}),
];
for (name, token) in scenarios {
let bytes = token.to_bytes();
let hex_output = format!(
"Token: {}\n\
Token ID: 0x{:016x}\n\
Object ID: 0x{:016x}:0x{:016x}\n\
Cancelled: {}\n\
Wire bytes: [{}]\n\
Hex: {}",
name,
token.token_id(),
token.object_id().high(),
token.object_id().low(),
token.is_cancelled(),
bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(", "),
bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>()
);
let expected = match name {
"fresh_token" => concat!(
"Token: fresh_token\n",
"Token ID: 0xc35d712d21a92850\n",
"Object ID: 0x1111222233334444:0x5555666677778888\n",
"Cancelled: false\n",
"Wire bytes: [c3, 5d, 71, 2d, 21, a9, 28, 50, 11, 11, 22, 22, 33, 33, 44, 44, 55, 55, 66, 66, 77, 77, 88, 88, 00]\n",
"Hex: c35d712d21a928501111222233334444555566667777888800"
),
"cancelled_token" => concat!(
"Token: cancelled_token\n",
"Token ID: 0x24c64de6e8aa6e00\n",
"Object ID: 0xaaaabbbbccccdddd:0xeeeeffff00001111\n",
"Cancelled: true\n",
"Wire bytes: [24, c6, 4d, e6, e8, aa, 6e, 00, aa, aa, bb, bb, cc, cc, dd, dd, ee, ee, ff, ff, 00, 00, 11, 11, 01]\n",
"Hex: 24c64de6e8aa6e00aaaabbbbccccddddeeeeffff0000111101"
),
"test_token_minimal" => concat!(
"Token: test_token_minimal\n",
"Token ID: 0x123456789abcdef0\n",
"Object ID: 0x0000000000000000:0x0000000000000001\n",
"Cancelled: false\n",
"Wire bytes: [12, 34, 56, 78, 9a, bc, de, f0, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 01, 00]\n",
"Hex: 123456789abcdef00000000000000000000000000000000100"
),
"test_token_max_values" => concat!(
"Token: test_token_max_values\n",
"Token ID: 0xffffffffffffffff\n",
"Object ID: 0xdeadbeefcafebabe:0x1337133713371337\n",
"Cancelled: true\n",
"Wire bytes: [ff, ff, ff, ff, ff, ff, ff, ff, de, ad, be, ef, ca, fe, ba, be, 13, 37, 13, 37, 13, 37, 13, 37, 01]\n",
"Hex: ffffffffffffffffdeadbeefcafebabe133713371337133701"
),
_ => unreachable!("unknown cancel token serialization scenario: {name}"),
};
assert_eq!(hex_output, expected);
}
}
#[test]
fn cancel_token_phase_transition_trace_canonical() {
let mut rng = DetRng::new(0x53A9_0001_0002_0003);
let parent = SymbolCancelToken::new(
ObjectId::new(0x1111_2222_3333_4444, 0x5555_6666_7777_8888),
&mut rng,
);
let preexisting_child = parent.child(&mut rng);
let listener_events = Arc::new(StdMutex::new(Vec::<Value>::new()));
let listener_events_for_callback = Arc::clone(&listener_events);
parent.add_listener(move |reason: &CancelReason, at: Time| {
listener_events_for_callback
.lock()
.unwrap()
.push(serde_json::json!({
"at_nanos": at.as_nanos(),
"kind": format!("{:?}", reason.kind),
}));
});
let fresh_parent = observable_token_state_json(&parent);
let fresh_preexisting_child = observable_token_state_json(&preexisting_child);
let first_cancel_at = Time::from_nanos(991);
assert!(
parent.cancel(&CancelReason::user("phase-zero"), first_cancel_at),
"first cancel should transition the token"
);
let after_first_cancel_events = listener_events.lock().unwrap().clone();
let after_first_cancel_parent = observable_token_state_json(&parent);
let after_first_cancel_preexisting_child = observable_token_state_json(&preexisting_child);
let late_child = parent.child(&mut rng);
let after_late_child_parent = observable_token_state_json(&parent);
let after_late_child_late_child = observable_token_state_json(&late_child);
let strengthened_returned_first_caller =
parent.cancel(&CancelReason::shutdown(), Time::from_nanos(4096));
let after_strengthen_events = listener_events.lock().unwrap().clone();
let after_strengthen_parent = observable_token_state_json(&parent);
let after_strengthen_preexisting_child = observable_token_state_json(&preexisting_child);
let after_strengthen_late_child = observable_token_state_json(&late_child);
let trace = serde_json::json!({
"fresh": {
"parent": fresh_parent,
"preexisting_child": fresh_preexisting_child,
},
"after_first_cancel": {
"listener_events": after_first_cancel_events,
"parent": after_first_cancel_parent,
"preexisting_child": after_first_cancel_preexisting_child,
},
"after_late_child": {
"late_child": after_late_child_late_child,
"parent": after_late_child_parent,
},
"after_strengthen": {
"late_child": after_strengthen_late_child,
"listener_events": after_strengthen_events,
"parent": after_strengthen_parent,
"preexisting_child": after_strengthen_preexisting_child,
"strengthened_returned_first_caller": strengthened_returned_first_caller,
},
});
insta::assert_json_snapshot!("cancel_token_phase_transition_trace_canonical", trace);
}
#[test]
fn listener_panic_does_not_propagate_to_cancel_caller() {
struct PanickingListener;
impl CancelListener for PanickingListener {
fn on_cancel(&self, _reason: &CancelReason, _at: Time) {
panic!("br-asupersync-64ijds: listener intentionally panics");
}
}
struct CountingListener {
calls: Arc<std::sync::atomic::AtomicU64>,
}
impl CancelListener for CountingListener {
fn on_cancel(&self, _reason: &CancelReason, _at: Time) {
self.calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
let mut rng = DetRng::new(64);
let token = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
token.add_listener(PanickingListener);
let calls = Arc::new(std::sync::atomic::AtomicU64::new(0));
token.add_listener(CountingListener {
calls: Arc::clone(&calls),
});
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
token.cancel(&CancelReason::user("initial"), Time::from_millis(100))
}));
assert!(
result.is_ok(),
"br-asupersync-64ijds: cancel must not propagate listener panic"
);
assert_eq!(result.unwrap(), true, "first cancel should return true");
assert_eq!(
calls.load(std::sync::atomic::Ordering::Relaxed),
1,
"br-asupersync-64ijds: counting listener must fire even when prior listener panicked"
);
assert!(token.is_cancelled());
let result2 = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
token.cancel(&CancelReason::timeout(), Time::from_millis(200))
}));
assert!(
result2.is_ok(),
"br-asupersync-64ijds: renotification must not propagate listener panic"
);
assert!(
calls.load(std::sync::atomic::Ordering::Relaxed) >= 2,
"counting listener must fire on renotification"
);
let result3 = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
token.add_listener(PanickingListener);
}));
assert!(
result3.is_ok(),
"br-asupersync-64ijds: late-add must not propagate listener panic"
);
}
#[test]
fn test_deserialized_cancelled_token_notifies_listener() {
use std::sync::{
Mutex,
atomic::{AtomicBool, Ordering},
};
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
cancel_handle.cancel(&CancelReason::user("initial"), Time::from_millis(100));
let parsed = SymbolCancelToken::from_bytes(&cancel_handle.to_bytes()).unwrap();
assert!(parsed.is_cancelled());
let notified = Arc::new(AtomicBool::new(false));
let notified_clone = Arc::clone(¬ified);
let seen_at = Arc::new(Mutex::new(None::<Time>));
let seen_at_clone = Arc::clone(&seen_at);
parsed.add_listener(move |_reason: &CancelReason, at: Time| {
notified_clone.store(true, Ordering::SeqCst);
*seen_at_clone.lock().unwrap() = Some(at);
});
assert!(notified.load(Ordering::SeqCst));
assert_eq!(
*seen_at.lock().unwrap(),
Some(Time::ZERO),
"deserialized cancelled tokens must replay with Time::ZERO instead of deadlocking"
);
}
#[test]
fn test_message_serialization() {
let msg = CancelMessage::new(
0x1234_5678_9abc_def0,
ObjectId::new_for_test(42),
CancelKind::Timeout,
Time::from_millis(1000),
999,
)
.with_max_hops(5);
let bytes = msg.to_bytes();
assert_eq!(bytes.len(), MESSAGE_WIRE_SIZE);
let parsed = CancelMessage::from_bytes(&bytes).unwrap();
assert_eq!(parsed.token_id(), msg.token_id());
assert_eq!(parsed.object_id(), msg.object_id());
assert_eq!(parsed.kind(), msg.kind());
assert_eq!(parsed.initiated_at(), msg.initiated_at());
assert_eq!(parsed.sequence(), msg.sequence());
}
#[test]
fn test_message_hop_limit() {
let msg = CancelMessage::new(
1,
ObjectId::new_for_test(1),
CancelKind::User,
Time::from_millis(100),
0,
)
.with_max_hops(3);
assert!(msg.can_forward());
assert_eq!(msg.hops(), 0);
let msg1 = msg.forwarded().unwrap();
assert_eq!(msg1.hops(), 1);
let msg2 = msg1.forwarded().unwrap();
assert_eq!(msg2.hops(), 2);
let msg3 = msg2.forwarded().unwrap();
assert_eq!(msg3.hops(), 3);
assert!(msg3.forwarded().is_none());
assert!(!msg3.can_forward());
}
#[test]
fn test_broadcaster_deduplication() {
let broadcaster = CancelBroadcaster::new(NullSink);
let msg = CancelMessage::new(
1,
ObjectId::new_for_test(1),
CancelKind::User,
Time::from_millis(100),
0,
);
let now = Time::from_millis(100);
let _ = broadcaster.receive_message(&msg, now);
let result = broadcaster.receive_message(&msg, now);
assert!(result.is_none());
let metrics = broadcaster.metrics();
assert_eq!(metrics.received, 1);
assert_eq!(metrics.duplicates, 1);
}
#[test]
fn test_prepare_cancel_uses_token_id() {
let mut rng = DetRng::new(7);
let object_id = ObjectId::new_for_test(42);
let cancel_handle = SymbolCancelToken::new(object_id, &mut rng);
let token_id = cancel_handle.token_id();
let broadcaster = CancelBroadcaster::new(NullSink);
broadcaster.register_token(cancel_handle);
let msg = broadcaster.prepare_cancel(
object_id,
&CancelReason::user("cancel"),
Time::from_millis(10),
);
assert_eq!(msg.token_id(), token_id);
}
#[test]
fn cross_sender_synthetic_token_id_does_not_collide() {
let object_id = ObjectId::new_for_test(0xCAFE);
let reason = CancelReason::user("cross-sender test");
let bcast_a = CancelBroadcaster::new(NullSink);
let bcast_b = CancelBroadcaster::new(NullSink);
let msg_a = bcast_a.prepare_cancel(object_id, &reason, Time::from_millis(10));
let msg_b = bcast_b.prepare_cancel(object_id, &reason, Time::from_millis(20));
assert_ne!(
msg_a.token_id(),
msg_b.token_id(),
"br-asupersync-ml5ba5: two broadcasters must produce distinct synthetic token_ids"
);
let receiver = CancelBroadcaster::new(NullSink);
let f_a = receiver.receive_message(&msg_a, Time::from_millis(30));
let f_b = receiver.receive_message(&msg_b, Time::from_millis(40));
assert!(f_a.is_some(), "first cancel must forward");
assert!(
f_b.is_some(),
"br-asupersync-ml5ba5: second sender's cancel must NOT be suppressed as duplicate"
);
}
#[test]
fn same_sender_synthetic_token_id_is_stable() {
let object_id = ObjectId::new_for_test(0xBEEF);
let reason = CancelReason::user("stable");
let bcast = CancelBroadcaster::new(NullSink);
let msg1 = bcast.prepare_cancel(object_id, &reason, Time::from_millis(10));
let msg2 = bcast.prepare_cancel(object_id, &reason, Time::from_millis(20));
assert_eq!(
msg1.token_id(),
msg2.token_id(),
"br-asupersync-ml5ba5: same broadcaster must produce stable synthetic token_id"
);
}
#[test]
fn test_broadcaster_forwards_message() {
let broadcaster = CancelBroadcaster::new(NullSink);
let msg = CancelMessage::new(
1,
ObjectId::new_for_test(1),
CancelKind::User,
Time::from_millis(100),
0,
);
let forwarded = broadcaster.receive_message(&msg, Time::from_millis(100));
assert!(forwarded.is_some());
assert_eq!(forwarded.unwrap().hops(), 1);
let metrics = broadcaster.metrics();
assert_eq!(metrics.received, 1);
assert_eq!(metrics.forwarded, 1);
}
#[test]
fn receive_message_preserves_origin_initiated_at_for_local_tokens() {
let mut rng = DetRng::new(88);
let object_id = ObjectId::new_for_test(88);
let token = SymbolCancelToken::new(object_id, &mut rng);
let child = token.child(&mut rng);
let seen_at = Arc::new(StdMutex::new(None::<Time>));
let seen_at_clone = Arc::clone(&seen_at);
token.add_listener(move |_reason: &CancelReason, at: Time| {
*seen_at_clone.lock().unwrap() = Some(at);
});
let broadcaster = CancelBroadcaster::new(NullSink);
broadcaster.register_token(token.clone());
let initiated_at = Time::from_millis(125);
let received_at = Time::from_millis(500);
let msg = CancelMessage::new(
token.token_id(),
object_id,
CancelKind::Shutdown,
initiated_at,
0,
);
let forwarded = broadcaster.receive_message(&msg, received_at);
assert!(forwarded.is_some(), "fresh cancel should still forward");
assert_eq!(
token.cancelled_at(),
Some(initiated_at),
"br-asupersync-zmeazg: remote cancel must preserve origin initiated_at"
);
assert_eq!(
child.cancelled_at(),
Some(initiated_at),
"child cascade should inherit the same origin initiated_at"
);
assert_eq!(
*seen_at.lock().unwrap(),
Some(initiated_at),
"listener callbacks must observe the origin initiated_at, not local receipt time"
);
}
#[test]
fn cancel_broadcast_drains_remote_children_under_lab_runtime() {
init_test_logging();
crate::test_phase!("cancel_broadcast_drains_remote_children_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0xCAA0_CE11)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let checkpoints = Arc::new(StdMutex::new(Vec::<Value>::new()));
let local_messages = Arc::new(StdMutex::new(Vec::<CancelMessage>::new()));
let remote_messages = Arc::new(StdMutex::new(Vec::<CancelMessage>::new()));
let (
local_cancelled,
remote_cancelled,
remote_child_cancelled,
late_child_cancelled,
remote_reason,
remote_metrics,
checkpoints,
) = LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = crate::cx::Cx::current().expect("lab runtime should install a current Cx");
let local_spawn_cx = cx.clone();
let remote_spawn_cx = cx.clone();
let object_id = ObjectId::new_for_test(44);
let local_sink = RecordingSink {
label: "local",
checkpoints: Arc::clone(&checkpoints),
messages: Arc::clone(&local_messages),
};
let remote_sink = RecordingSink {
label: "remote",
checkpoints: Arc::clone(&checkpoints),
messages: Arc::clone(&remote_messages),
};
let local_broadcaster = Arc::new(CancelBroadcaster::new(local_sink));
let remote_broadcaster = Arc::new(CancelBroadcaster::new(remote_sink));
let mut local_rng = DetRng::new(101);
let local_token = SymbolCancelToken::new(object_id, &mut local_rng);
local_broadcaster.register_token(local_token.clone());
let mut remote_rng = DetRng::new(202);
let remote_token = SymbolCancelToken::new(object_id, &mut remote_rng);
let remote_child = remote_token.child(&mut remote_rng);
let late_child = Arc::new(StdMutex::new(None::<SymbolCancelToken>));
let late_child_listener = Arc::clone(&late_child);
let listener_checkpoints = Arc::clone(&checkpoints);
let remote_token_for_listener = remote_token.clone();
remote_token.add_listener(move |reason: &CancelReason, at: Time| {
let listener_event = serde_json::json!({
"phase": "remote_listener_invoked",
"kind": format!("{:?}", reason.kind),
"at_millis": at.as_millis(),
});
tracing::info!(event = %listener_event, "symbol_cancel_lab_checkpoint");
listener_checkpoints.lock().unwrap().push(listener_event);
let mut child_rng = DetRng::new(303);
let child = remote_token_for_listener.child(&mut child_rng);
*late_child_listener.lock().unwrap() = Some(child);
});
remote_broadcaster.register_token(remote_token.clone());
let local_task = LabRuntimeTarget::spawn(&local_spawn_cx, Budget::INFINITE, {
let local_broadcaster = Arc::clone(&local_broadcaster);
let local_token = local_token.clone();
let checkpoints = Arc::clone(&checkpoints);
async move {
let request = serde_json::json!({
"phase": "local_cancel_requested",
"object_high": object_id.high(),
});
tracing::info!(event = %request, "symbol_cancel_lab_checkpoint");
checkpoints.lock().unwrap().push(request);
let sent = local_broadcaster
.cancel(object_id, &CancelReason::shutdown(), Time::from_millis(100))
.await
.expect("local cancel should broadcast successfully");
let completed = serde_json::json!({
"phase": "local_cancel_completed",
"sent": sent,
});
tracing::info!(event = %completed, "symbol_cancel_lab_checkpoint");
checkpoints.lock().unwrap().push(completed);
local_token.is_cancelled()
}
});
let local_outcome = local_task.await;
crate::assert_with_log!(
matches!(local_outcome, crate::types::Outcome::Ok(true)),
"local cancel task completes successfully",
true,
matches!(local_outcome, crate::types::Outcome::Ok(true))
);
let crate::types::Outcome::Ok(local_cancelled) = local_outcome else {
panic!("local cancel task should finish successfully");
};
let forwarded = local_messages
.lock()
.unwrap()
.first()
.cloned()
.expect("local cancel should emit a broadcast message");
let remote_task = LabRuntimeTarget::spawn(&remote_spawn_cx, Budget::INFINITE, {
let remote_broadcaster = Arc::clone(&remote_broadcaster);
let remote_token = remote_token.clone();
let remote_child = remote_child.clone();
let late_child = Arc::clone(&late_child);
let checkpoints = Arc::clone(&checkpoints);
async move {
let received = serde_json::json!({
"phase": "remote_handle_started",
"sequence": forwarded.sequence(),
});
tracing::info!(event = %received, "symbol_cancel_lab_checkpoint");
checkpoints.lock().unwrap().push(received);
remote_broadcaster
.handle_message(forwarded, Time::from_millis(125))
.await
.expect("remote handle_message should succeed");
let completed = serde_json::json!({
"phase": "remote_handle_completed",
"forwarded_count": remote_broadcaster.metrics().forwarded,
});
tracing::info!(event = %completed, "symbol_cancel_lab_checkpoint");
checkpoints.lock().unwrap().push(completed);
(
remote_token.is_cancelled(),
remote_child.is_cancelled(),
late_child
.lock()
.unwrap()
.clone()
.expect("late child should be created by remote listener")
.is_cancelled(),
remote_token
.reason()
.expect("remote token should have a reason")
.kind,
remote_broadcaster.metrics(),
)
}
});
let remote_outcome = remote_task.await;
crate::assert_with_log!(
matches!(remote_outcome, crate::types::Outcome::Ok(_)),
"remote handle task completes successfully",
true,
matches!(remote_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok((
remote_cancelled,
remote_child_cancelled,
late_child_cancelled,
remote_reason,
remote_metrics,
)) = remote_outcome
else {
panic!("remote handle task should finish successfully");
};
assert_eq!(
remote_token.state.children.read().len(),
0,
"remote cancellation should drain queued children before returning"
);
assert_eq!(
remote_token.state.listeners.read().len(),
1,
"remote cancellation should retain only the original listener before returning"
);
(
local_cancelled,
remote_cancelled,
remote_child_cancelled,
late_child_cancelled,
remote_reason,
remote_metrics,
checkpoints.lock().unwrap().clone(),
)
});
assert!(
local_cancelled,
"local token should be cancelled by broadcaster.cancel"
);
assert!(
remote_cancelled,
"remote token should be cancelled by forwarded message"
);
assert!(
remote_child_cancelled,
"remote pre-existing child should be drained during cancellation"
);
assert!(
late_child_cancelled,
"listener-spawned child should be cancelled before handle_message returns"
);
assert_eq!(remote_reason, CancelKind::Shutdown);
assert_eq!(remote_metrics.received, 1);
assert_eq!(remote_metrics.forwarded, 1);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "local_broadcast"),
"local broadcast checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "remote_listener_invoked"),
"remote listener checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "remote_handle_completed"),
"remote completion checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"symbol cancel lab-runtime test should leave runtime invariants clean: {violations:?}"
);
}
#[test]
fn test_broadcaster_seen_eviction_is_fifo() {
let mut broadcaster = CancelBroadcaster::new(NullSink);
broadcaster.max_seen = 3;
let object_id = ObjectId::new_for_test(1);
for seq in 0..4 {
broadcaster.mark_seen(object_id, 1, seq);
}
let (len, has_10, has_11, front) = {
let seen = broadcaster.seen_sequences.read();
let len = seen.set.len();
let has_10 = seen.set.contains(&(object_id, 1, 0));
let has_11 = seen.set.contains(&(object_id, 1, 1));
let front = seen.order.front().copied();
drop(seen);
(len, has_10, has_11, front)
};
assert_eq!(len, 3);
assert!(!has_10);
assert!(has_11);
assert_eq!(front, Some((object_id, 1, 1)));
}
#[test]
fn test_cleanup_pending_symbols() {
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(1);
let now = Time::from_millis(100);
coordinator.register_handler(object_id, CountingCleanupHandler);
for i in 0..5 {
let symbol = Symbol::new_for_test(1, 0, i, &[1, 2, 3, 4]);
coordinator.register_pending(object_id, symbol, now);
}
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 1);
assert_eq!(stats.pending_symbols, 5);
assert_eq!(stats.pending_bytes, 20);
let result = coordinator.cleanup(object_id, None);
assert_eq!(result.symbols_cleaned, 5);
assert_eq!(result.bytes_freed, 20);
assert!(result.within_budget);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 0);
}
#[test]
fn test_cleanup_within_budget() {
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(1);
let now = Time::from_millis(100);
let symbol = Symbol::new_for_test(1, 0, 0, &[1, 2, 3, 4]);
coordinator.register_pending(object_id, symbol, now);
let budget = Budget::new().with_poll_quota(1000);
let result = coordinator.cleanup(object_id, Some(budget));
assert!(result.within_budget);
}
#[test]
fn test_cleanup_handler_called() {
use std::sync::atomic::{AtomicBool, Ordering};
struct TestHandler {
called: Arc<AtomicBool>,
}
impl CleanupHandler for TestHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
self.called.store(true, Ordering::SeqCst);
Ok(0)
}
fn name(&self) -> &'static str {
"test"
}
}
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(1);
let now = Time::from_millis(100);
let called = Arc::new(AtomicBool::new(false));
coordinator.register_handler(
object_id,
TestHandler {
called: called.clone(),
},
);
let symbol = Symbol::new_for_test(1, 0, 0, &[1, 2]);
coordinator.register_pending(object_id, symbol, now);
let result = coordinator.cleanup(object_id, None);
assert!(called.load(Ordering::SeqCst));
assert_eq!(result.handlers_run, vec!["test"]);
assert!(result.completed);
assert!(result.handler_errors.is_empty());
}
#[test]
fn test_cleanup_with_handler_and_no_symbols_marks_completed() {
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(10);
coordinator.register_handler(object_id, CountingCleanupHandler);
let result = coordinator.cleanup(object_id, None);
assert!(result.completed, "empty cleanup should complete");
assert!(
coordinator.completed.read().contains(&object_id),
"successful empty cleanup must mark object completed"
);
assert_eq!(
coordinator
.handlers
.read()
.get(&object_id)
.map(|handler| handler.name()),
None,
"cleanup should drop the registered handler"
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(10, 0, 0, &[1, 2, 3]),
Time::from_millis(101),
);
let stats = coordinator.stats();
assert_eq!(
stats.pending_objects, 0,
"late pending symbols must be rejected after completed empty cleanup"
);
assert_eq!(stats.pending_symbols, 0);
}
#[test]
fn test_clear_pending_drops_registered_handler() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct DropCountingHandler {
drops: Arc<AtomicUsize>,
}
impl Drop for DropCountingHandler {
fn drop(&mut self) {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
impl CleanupHandler for DropCountingHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
Ok(0)
}
fn name(&self) -> &'static str {
"drop-counting"
}
}
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(6);
let now = Time::from_millis(100);
let drops = Arc::new(AtomicUsize::new(0));
coordinator.register_handler(
object_id,
DropCountingHandler {
drops: Arc::clone(&drops),
},
);
coordinator.register_pending(object_id, Symbol::new_for_test(6, 0, 0, &[1, 2, 3]), now);
assert_eq!(coordinator.handlers.read().len(), 1);
assert_eq!(coordinator.clear_pending(&object_id), Some(1));
assert_eq!(coordinator.handlers.read().len(), 0);
assert_eq!(drops.load(Ordering::SeqCst), 1);
}
#[test]
fn test_cleanup_handler_error_preserves_retry_state() {
struct FailingHandler;
impl CleanupHandler for FailingHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
Err(crate::error::Error::new(crate::error::ErrorKind::Internal)
.with_message("cleanup failed"))
}
fn name(&self) -> &'static str {
"failing"
}
}
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(7);
let now = Time::from_millis(100);
coordinator.register_handler(object_id, FailingHandler);
coordinator.register_pending(object_id, Symbol::new_for_test(7, 0, 0, &[1, 2, 3]), now);
let result = coordinator.cleanup(object_id, None);
assert!(
!result.completed,
"failed handler must not report completion"
);
assert_eq!(
result.symbols_cleaned, 0,
"failed cleanup must not report cleaned symbols"
);
assert_eq!(
result.bytes_freed, 0,
"failed cleanup must not report freed bytes"
);
assert_eq!(result.handlers_run, vec!["failing"]);
assert_eq!(result.handler_errors.len(), 1);
assert!(
result.handler_errors[0].contains("cleanup failed"),
"{}",
result.handler_errors[0]
);
let stats = coordinator.stats();
assert_eq!(
stats.pending_objects, 1,
"failed cleanup must remain retryable"
);
assert_eq!(stats.pending_symbols, 1);
assert_eq!(stats.pending_bytes, 3);
}
#[test]
fn restore_retry_state_acquires_handler_table_before_pending_state() {
use std::sync::Barrier;
use std::time::{Duration, Instant};
let coordinator = Arc::new(CleanupCoordinator::new());
let object_id = ObjectId::new_for_test(70);
let pending_set = PendingSymbolSet {
symbols: vec![Symbol::new_for_test(70, 0, 0, &[1, 2, 3])],
total_bytes: 3,
_created_at: Time::from_millis(100),
};
let pending_guard = coordinator.pending.write();
let started = Arc::new(Barrier::new(2));
let restore_started = Arc::clone(&started);
let restore_coordinator = Arc::clone(&coordinator);
let handle = std::thread::spawn(move || {
restore_started.wait();
restore_coordinator.restore_retry_state(
object_id,
Box::new(CountingCleanupHandler),
pending_set,
);
});
started.wait();
let mut saw_handler_table_locked = false;
let deadline = Instant::now() + Duration::from_secs(1);
while Instant::now() < deadline {
if coordinator.handlers.try_write().is_none() {
saw_handler_table_locked = true;
break;
}
std::thread::yield_now();
}
drop(pending_guard);
handle
.join()
.expect("retry-state restoration thread should finish");
assert!(
saw_handler_table_locked,
"restore_retry_state must acquire handlers before waiting for pending; \
otherwise a handlers->pending caller can form an AB-BA lock cycle"
);
assert!(
coordinator.handlers.read().contains_key(&object_id),
"retry restoration should preserve the cleanup handler"
);
assert_eq!(
coordinator.stats().pending_symbols,
1,
"retry restoration should preserve pending symbols"
);
}
#[test]
fn test_cleanup_handler_error_reopens_object_for_new_pending_symbols() {
struct FailingHandler;
impl CleanupHandler for FailingHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
Err(crate::error::Error::new(crate::error::ErrorKind::Internal)
.with_message("cleanup failed"))
}
fn name(&self) -> &'static str {
"failing"
}
}
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(8);
let now = Time::from_millis(100);
coordinator.register_handler(object_id, FailingHandler);
coordinator.register_pending(object_id, Symbol::new_for_test(8, 0, 0, &[1, 2, 3]), now);
let result = coordinator.cleanup(object_id, None);
assert!(
!result.completed,
"failed cleanup must leave object retryable"
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(8, 0, 1, &[4, 5]),
Time::from_millis(101),
);
let stats = coordinator.stats();
assert_eq!(
stats.pending_symbols, 2,
"retryable cleanup must continue accepting pending symbols"
);
assert_eq!(stats.pending_bytes, 5);
}
#[test]
fn test_cleanup_budget_exhaustion_reopens_object_for_new_pending_symbols() {
struct RecordingHandler;
impl CleanupHandler for RecordingHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
Ok(1)
}
fn name(&self) -> &'static str {
"recording"
}
}
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(9);
let now = Time::from_millis(100);
coordinator.register_handler(object_id, RecordingHandler);
coordinator.register_pending(object_id, Symbol::new_for_test(9, 0, 0, &[1]), now);
let budget = Budget::new().with_poll_quota(0);
let result = coordinator.cleanup(object_id, Some(budget));
assert!(
!result.completed,
"budget-exhausted cleanup must leave object retryable"
);
assert!(
!result.within_budget,
"zero-poll budget should report budget exhaustion"
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(9, 0, 1, &[2, 3]),
Time::from_millis(101),
);
let stats = coordinator.stats();
assert_eq!(
stats.pending_symbols, 2,
"budget-exhausted cleanup must continue accepting pending symbols"
);
assert_eq!(stats.pending_bytes, 3);
}
#[test]
fn test_cleanup_handler_invoked_without_holding_handler_lock() {
use std::sync::atomic::{AtomicBool, Ordering};
struct LockCheckHandler {
coordinator: Arc<CleanupCoordinator>,
write_lock_available: Arc<AtomicBool>,
}
impl CleanupHandler for LockCheckHandler {
fn cleanup(
&self,
_object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
let can_acquire_write = self.coordinator.handlers.try_write().is_some();
self.write_lock_available
.store(can_acquire_write, Ordering::SeqCst);
Ok(0)
}
fn name(&self) -> &'static str {
"lock-check"
}
}
let coordinator = Arc::new(CleanupCoordinator::new());
let object_id = ObjectId::new_for_test(99);
let now = Time::from_millis(100);
let write_lock_available = Arc::new(AtomicBool::new(false));
coordinator.register_handler(
object_id,
LockCheckHandler {
coordinator: Arc::clone(&coordinator),
write_lock_available: Arc::clone(&write_lock_available),
},
);
coordinator.register_pending(object_id, Symbol::new_for_test(99, 0, 0, &[1]), now);
let _ = coordinator.cleanup(object_id, None);
assert!(
write_lock_available.load(Ordering::SeqCst),
"cleanup handler callback should execute without handlers lock held"
);
}
#[test]
fn test_cleanup_stats_accurate() {
let coordinator = CleanupCoordinator::new();
let now = Time::from_millis(100);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 0);
assert_eq!(stats.pending_symbols, 0);
assert_eq!(stats.pending_bytes, 0);
let obj1 = ObjectId::new_for_test(1);
let obj2 = ObjectId::new_for_test(2);
coordinator.register_pending(obj1, Symbol::new_for_test(1, 0, 0, &[1, 2, 3]), now);
coordinator.register_pending(obj1, Symbol::new_for_test(1, 0, 1, &[4, 5, 6]), now);
coordinator.register_pending(obj2, Symbol::new_for_test(2, 0, 0, &[7, 8]), now);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 2);
assert_eq!(stats.pending_symbols, 3);
assert_eq!(stats.pending_bytes, 8);
coordinator.clear_pending(&obj1);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 1);
assert_eq!(stats.pending_symbols, 1);
assert_eq!(stats.pending_bytes, 2);
}
#[test]
fn test_grandchild_inherits_cancellation() {
let mut rng = DetRng::new(42);
let grandparent = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let parent = grandparent.child(&mut rng);
let child = parent.child(&mut rng);
assert!(!child.is_cancelled());
grandparent.cancel(&CancelReason::user("cascade"), Time::from_millis(100));
assert!(parent.is_cancelled());
assert!(child.is_cancelled());
assert_eq!(child.reason().unwrap().kind, CancelKind::ParentCancelled);
assert_eq!(
reason_chain_kinds(&child),
vec![
CancelKind::ParentCancelled,
CancelKind::ParentCancelled,
CancelKind::User,
],
"grandchild cancellation should validate the full parent chain"
);
}
#[test]
fn test_cancel_drains_children_and_late_child_is_not_queued() {
let mut rng = DetRng::new(7);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(5), &mut rng);
let child_a = parent.child(&mut rng);
let child_b = parent.child(&mut rng);
assert_eq!(
parent.state.children.read().len(),
2,
"precondition: both children should be queued under parent"
);
let now = Time::from_millis(100);
assert!(
parent.cancel(&CancelReason::user("drain"), now),
"first caller should trigger cancellation"
);
assert!(child_a.is_cancelled(), "queued child A must be cancelled");
assert!(child_b.is_cancelled(), "queued child B must be cancelled");
assert_eq!(
parent.state.children.read().len(),
0,
"children vector must be drained after parent cancel"
);
let late_child = parent.child(&mut rng);
assert!(
late_child.is_cancelled(),
"late child should be cancelled immediately when parent already cancelled"
);
assert_eq!(
parent.state.children.read().len(),
0,
"late child should not be retained in parent children vector"
);
}
#[test]
fn test_listener_spawned_child_is_drained_inline() {
let mut rng = DetRng::new(91);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(6), &mut rng);
let observed_child = Arc::new(std::sync::Mutex::new(None::<SymbolCancelToken>));
let observed_child_clone = Arc::clone(&observed_child);
let parent_for_listener = parent.clone();
parent.add_listener(move |_: &CancelReason, _: Time| {
let mut child_rng = DetRng::new(92);
let child = parent_for_listener.child(&mut child_rng);
*observed_child_clone.lock().unwrap() = Some(child);
});
let now = Time::from_millis(150);
assert!(
parent.cancel(&CancelReason::user("listener-child"), now),
"first caller should trigger cancellation"
);
let late_child = observed_child
.lock()
.unwrap()
.clone()
.expect("listener should create a child during cancellation");
assert!(
late_child.is_cancelled(),
"child created during listener callback must be cancelled before cancel() returns"
);
assert_eq!(
late_child.reason().unwrap().kind,
CancelKind::ParentCancelled,
"late child should inherit parent-cancelled semantics"
);
assert_eq!(
reason_chain_kinds(&late_child),
vec![CancelKind::ParentCancelled, CancelKind::User],
"late child created inside listener should retain the parent reason as a cause"
);
assert_eq!(
late_child.cancelled_at(),
Some(now),
"late child should observe the parent cancellation timestamp"
);
assert_eq!(
parent.state.children.read().len(),
0,
"listener-spawned child must not be retained after drain completes"
);
}
#[test]
fn test_listener_registered_during_cancel_not_requeued() {
let mut rng = DetRng::new(93);
let token = SymbolCancelToken::new(ObjectId::new_for_test(7), &mut rng);
let notification_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let seen_kind = Arc::new(std::sync::Mutex::new(None::<CancelKind>));
let seen_time = Arc::new(std::sync::Mutex::new(None::<Time>));
let token_for_listener = token.clone();
let notification_count_clone = Arc::clone(¬ification_count);
let seen_kind_clone = Arc::clone(&seen_kind);
let seen_time_clone = Arc::clone(&seen_time);
token.add_listener(move |_: &CancelReason, _: Time| {
token_for_listener.add_listener({
let notification_count_clone = Arc::clone(¬ification_count_clone);
let seen_kind_clone = Arc::clone(&seen_kind_clone);
let seen_time_clone = Arc::clone(&seen_time_clone);
move |reason: &CancelReason, at: Time| {
notification_count_clone.fetch_add(1, Ordering::SeqCst);
*seen_kind_clone.lock().unwrap() = Some(reason.kind);
*seen_time_clone.lock().unwrap() = Some(at);
}
});
});
let now = Time::from_millis(175);
assert!(
token.cancel(&CancelReason::timeout(), now),
"first caller should trigger listener drain"
);
assert_eq!(
notification_count.load(Ordering::SeqCst),
1,
"listener registered during cancellation should be invoked inline exactly once"
);
assert_eq!(
*seen_kind.lock().unwrap(),
Some(CancelKind::Timeout),
"late listener should observe the current cancellation kind"
);
assert_eq!(
*seen_time.lock().unwrap(),
Some(now),
"late listener should observe the current cancellation timestamp"
);
assert_eq!(
token.state.listeners.read().len(),
1,
"the original retained listener remains, but the late listener must not be queued"
);
token.cancel(&CancelReason::shutdown(), Time::from_millis(200));
assert_eq!(
notification_count.load(Ordering::SeqCst),
2,
"the retained original listener should run again on strengthen and self-notify one late listener"
);
assert_eq!(
*seen_kind.lock().unwrap(),
Some(CancelKind::Shutdown),
"late listener should observe the strengthened cancellation kind"
);
assert_eq!(
*seen_time.lock().unwrap(),
Some(now),
"late listener should observe the canonical first-cancel timestamp after strengthen"
);
assert_eq!(
token.state.listeners.read().len(),
1,
"strengthened cancellations retain only the original listener"
);
}
#[test]
fn test_listener_registered_during_cancel_can_spawn_child_without_leak() {
let mut rng = DetRng::new(94);
let token = SymbolCancelToken::new(ObjectId::new_for_test(8), &mut rng);
let spawned_child = Arc::new(std::sync::Mutex::new(None::<SymbolCancelToken>));
let spawned_child_clone = Arc::clone(&spawned_child);
let child_notification_count = Arc::new(AtomicUsize::new(0));
let child_notification_count_clone = Arc::clone(&child_notification_count);
let token_for_listener = token.clone();
token.add_listener(move |_: &CancelReason, _: Time| {
token_for_listener.add_listener({
let spawned_child_clone = Arc::clone(&spawned_child_clone);
let child_notification_count_clone = Arc::clone(&child_notification_count_clone);
let token_for_listener = token_for_listener.clone();
move |reason: &CancelReason, at: Time| {
child_notification_count_clone.fetch_add(1, Ordering::SeqCst);
let mut child_rng = DetRng::new(95);
let child = token_for_listener.child(&mut child_rng);
assert!(
child.is_cancelled(),
"child created from a late listener must be cancelled inline"
);
assert_eq!(
child.reason().unwrap().kind,
CancelKind::ParentCancelled,
"late child should inherit parent-cancelled semantics"
);
assert_eq!(
child.cancelled_at(),
Some(at),
"late child should observe the current cancellation timestamp"
);
assert_eq!(
reason.kind,
CancelKind::Shutdown,
"late listener should observe the active cancellation reason"
);
*spawned_child_clone.lock().unwrap() = Some(child);
}
});
});
let now = Time::from_millis(250);
assert!(
token.cancel(&CancelReason::shutdown(), now),
"first caller should trigger cancellation"
);
let child = spawned_child
.lock()
.unwrap()
.clone()
.expect("late listener should have spawned a child");
assert_eq!(
child_notification_count.load(Ordering::SeqCst),
1,
"late listener should run exactly once during drain"
);
assert!(child.is_cancelled(), "spawned child must remain cancelled");
assert_eq!(
child.cancelled_at(),
Some(now),
"spawned child should be cancelled before cancel() returns"
);
assert_eq!(
token.state.listeners.read().len(),
1,
"drain must retain only the original listener, not the late listener"
);
assert_eq!(
token.state.children.read().len(),
0,
"drain must leave no late children queued"
);
}
#[test]
fn test_child_cancel_does_not_propagate_upward() {
let mut rng = DetRng::new(42);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let child = parent.child(&mut rng);
child.cancel(&CancelReason::user("child only"), Time::from_millis(100));
assert!(child.is_cancelled());
assert!(!parent.is_cancelled());
}
#[test]
fn test_cancel_strengthens_reason() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let first = cancel_handle.cancel(&CancelReason::user("first"), Time::from_millis(100));
assert!(first);
let second = cancel_handle.cancel(
&CancelReason::new(CancelKind::Shutdown),
Time::from_millis(200),
);
assert!(!second);
assert_eq!(cancel_handle.reason().unwrap().kind, CancelKind::Shutdown);
assert_eq!(cancel_handle.cancelled_at(), Some(Time::from_millis(100)));
}
#[test]
fn test_cancel_does_not_weaken_reason() {
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let first = cancel_handle.cancel(
&CancelReason::new(CancelKind::Shutdown),
Time::from_millis(100),
);
assert!(first);
let second = cancel_handle.cancel(&CancelReason::user("gentle"), Time::from_millis(200));
assert!(!second);
assert_eq!(cancel_handle.reason().unwrap().kind, CancelKind::Shutdown);
}
#[test]
fn test_multiple_listeners_all_notified() {
use std::sync::atomic::{AtomicU32, Ordering};
let mut rng = DetRng::new(42);
let cancel_handle = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let count = Arc::new(AtomicU32::new(0));
for _ in 0..3 {
let c = count.clone();
cancel_handle.add_listener(move |_: &CancelReason, _: Time| {
c.fetch_add(1, Ordering::SeqCst);
});
}
cancel_handle.cancel(&CancelReason::timeout(), Time::from_millis(100));
assert_eq!(count.load(Ordering::SeqCst), 3);
}
#[test]
fn test_cleanup_multiple_objects_independent() {
let coordinator = CleanupCoordinator::new();
let now = Time::from_millis(100);
let obj1 = ObjectId::new_for_test(1);
let obj2 = ObjectId::new_for_test(2);
coordinator.register_handler(obj1, CountingCleanupHandler);
for i in 0..3 {
coordinator.register_pending(obj1, Symbol::new_for_test(1, 0, i, &[1, 2]), now);
}
for i in 0..2 {
coordinator.register_pending(obj2, Symbol::new_for_test(2, 0, i, &[3, 4, 5]), now);
}
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 2);
assert_eq!(stats.pending_symbols, 5);
let result = coordinator.cleanup(obj1, None);
assert_eq!(result.symbols_cleaned, 3);
assert_eq!(result.bytes_freed, 6);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 1);
assert_eq!(stats.pending_symbols, 2);
assert_eq!(stats.pending_bytes, 6); }
#[test]
fn test_token_serialization_roundtrip_deterministic() {
let mut rng = DetRng::new(99);
let obj = ObjectId::new(0xdead_beef_cafe_babe, 0x1234_5678_9abc_def0);
let cancel_handle = SymbolCancelToken::new(obj, &mut rng);
let bytes1 = cancel_handle.to_bytes();
let parsed1 = SymbolCancelToken::from_bytes(&bytes1).unwrap();
let bytes2 = parsed1.to_bytes();
assert_eq!(bytes1, bytes2, "serialization must be deterministic");
assert_eq!(parsed1.token_id(), cancel_handle.token_id());
assert_eq!(parsed1.object_id(), cancel_handle.object_id());
}
#[test]
fn test_message_forwarding_exhausts_at_zero_hops() {
let msg = CancelMessage::new(
1,
ObjectId::new_for_test(1),
CancelKind::User,
Time::from_millis(100),
0,
)
.with_max_hops(0);
assert!(!msg.can_forward());
assert!(msg.forwarded().is_none());
}
#[test]
fn test_broadcaster_separate_tokens_independent() {
let broadcaster = CancelBroadcaster::new(NullSink);
let msg1 = CancelMessage::new(
1,
ObjectId::new_for_test(1),
CancelKind::User,
Time::from_millis(100),
0,
);
let msg2 = CancelMessage::new(
2,
ObjectId::new_for_test(2),
CancelKind::Timeout,
Time::from_millis(200),
0,
);
let now = Time::from_millis(100);
let r1 = broadcaster.receive_message(&msg1, now);
let r2 = broadcaster.receive_message(&msg2, now);
assert!(r1.is_some());
assert!(r2.is_some());
let metrics = broadcaster.metrics();
assert_eq!(metrics.received, 2);
assert_eq!(metrics.duplicates, 0);
}
#[test]
fn meta_transitive_cascade_property() {
let mut rng = DetRng::new(12345);
let root = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng);
let level1 = root.child(&mut rng);
let level2 = level1.child(&mut rng);
let level3 = level2.child(&mut rng);
let mut rng2 = DetRng::new(12345); let ref_root = SymbolCancelToken::new(ObjectId::new_for_test(1), &mut rng2);
let ref_level1 = ref_root.child(&mut rng2);
let ref_level2 = ref_level1.child(&mut rng2);
let ref_level3 = ref_level2.child(&mut rng2);
let now = Time::from_millis(500);
root.cancel(&CancelReason::user("cascade_test"), now);
ref_root.cancel(&CancelReason::user("cascade_test"), now);
assert_eq!(root.is_cancelled(), ref_root.is_cancelled());
assert_eq!(level1.is_cancelled(), ref_level1.is_cancelled());
assert_eq!(level2.is_cancelled(), ref_level2.is_cancelled());
assert_eq!(level3.is_cancelled(), ref_level3.is_cancelled());
assert_eq!(root.reason().unwrap().kind, CancelKind::User);
assert_eq!(level1.reason().unwrap().kind, CancelKind::ParentCancelled);
assert_eq!(level2.reason().unwrap().kind, CancelKind::ParentCancelled);
assert_eq!(level3.reason().unwrap().kind, CancelKind::ParentCancelled);
assert_eq!(
reason_chain_kinds(&level3),
vec![
CancelKind::ParentCancelled,
CancelKind::ParentCancelled,
CancelKind::ParentCancelled,
CancelKind::User,
],
"deep descendant should retain every parent-cancelled hop plus the root cause"
);
}
#[test]
fn meta_order_independence_cascade() {
let mut rng1 = DetRng::new(67890);
let parent1 = SymbolCancelToken::new(ObjectId::new_for_test(10), &mut rng1);
let child1a = parent1.child(&mut rng1);
let child1b = parent1.child(&mut rng1);
let child1c = parent1.child(&mut rng1);
let mut rng2 = DetRng::new(67890); let _parent2 = SymbolCancelToken::new(ObjectId::new_for_test(10), &mut rng2);
let _ = rng2.next_u64(); let _ = rng2.next_u64(); let _ = rng2.next_u64();
let mut rng2 = DetRng::new(67890);
let parent2 = SymbolCancelToken::new(ObjectId::new_for_test(10), &mut rng2);
let child2a = parent2.child(&mut rng2);
let child2c = parent2.child(&mut rng2);
let child2b = parent2.child(&mut rng2);
let now = Time::from_millis(1000);
parent1.cancel(&CancelReason::timeout(), now);
parent2.cancel(&CancelReason::timeout(), now);
assert_eq!(parent1.is_cancelled(), parent2.is_cancelled());
assert_eq!(child1a.is_cancelled(), child2a.is_cancelled());
assert_eq!(child1b.is_cancelled(), child2b.is_cancelled());
assert_eq!(child1c.is_cancelled(), child2c.is_cancelled());
assert_eq!(
child1a.reason().unwrap().kind,
child2a.reason().unwrap().kind
);
assert_eq!(
child1b.reason().unwrap().kind,
child2b.reason().unwrap().kind
);
assert_eq!(
child1c.reason().unwrap().kind,
child2c.reason().unwrap().kind
);
}
#[test]
fn meta_reason_monotonicity_cascade() {
let mut rng = DetRng::new(11111);
let token = SymbolCancelToken::new(ObjectId::new_for_test(20), &mut rng);
let weak_reasons = vec![CancelReason::user("weak1"), CancelReason::user("weak2")];
let strong_reasons = vec![
CancelReason::timeout(),
CancelReason::new(CancelKind::Shutdown),
];
let now = Time::from_millis(2000);
for reason in &weak_reasons {
token.cancel(reason, now);
}
let after_weak = token.reason().unwrap().kind;
for reason in &strong_reasons {
token.cancel(reason, now);
}
let after_strong = token.reason().unwrap().kind;
assert_eq!(after_strong, CancelKind::Shutdown); assert!(matches!(
(after_weak, after_strong),
(
CancelKind::User | CancelKind::Timeout | CancelKind::Shutdown,
CancelKind::Shutdown
)
));
}
#[test]
fn meta_repeat_cancel_matches_single_cancel_observable_state() {
let mut once_rng = DetRng::new(16_777_216);
let once_root = SymbolCancelToken::new(ObjectId::new_for_test(21), &mut once_rng);
let once_child_a = once_root.child(&mut once_rng);
let once_child_b = once_root.child(&mut once_rng);
let once_grandchild = once_child_a.child(&mut once_rng);
let once_order = Arc::new(StdMutex::new(Vec::new()));
for token in [&once_root, &once_child_a, &once_child_b, &once_grandchild] {
attach_order_listener(token, &once_order);
}
let mut repeated_rng = DetRng::new(16_777_216);
let repeated_root = SymbolCancelToken::new(ObjectId::new_for_test(21), &mut repeated_rng);
let repeated_child_a = repeated_root.child(&mut repeated_rng);
let repeated_child_b = repeated_root.child(&mut repeated_rng);
let repeated_grandchild = repeated_child_a.child(&mut repeated_rng);
let repeated_order = Arc::new(StdMutex::new(Vec::new()));
for token in [
&repeated_root,
&repeated_child_a,
&repeated_child_b,
&repeated_grandchild,
] {
attach_order_listener(token, &repeated_order);
}
let reason = CancelReason::timeout();
let now = Time::from_millis(2_500);
assert!(
once_root.cancel(&reason, now),
"first cancellation should win for single-cancel fixture"
);
assert!(
repeated_root.cancel(&reason, now),
"first cancellation should win for repeated-cancel fixture"
);
for _ in 0..3 {
assert!(
!repeated_root.cancel(&reason, now),
"subsequent identical cancellations must be idempotent"
);
}
assert_eq!(snapshot_token(&once_root), snapshot_token(&repeated_root));
assert_eq!(
snapshot_token(&once_child_a),
snapshot_token(&repeated_child_a)
);
assert_eq!(
snapshot_token(&once_child_b),
snapshot_token(&repeated_child_b)
);
assert_eq!(
snapshot_token(&once_grandchild),
snapshot_token(&repeated_grandchild)
);
assert_eq!(
*once_order.lock().unwrap(),
*repeated_order.lock().unwrap(),
"identical repeated cancellations must not perturb drain order"
);
}
#[test]
fn meta_upward_isolation_property() {
let mut rng = DetRng::new(22222);
let parent = SymbolCancelToken::new(ObjectId::new_for_test(30), &mut rng);
let child_a = parent.child(&mut rng);
let child_b = parent.child(&mut rng);
let child_c = parent.child(&mut rng);
let parent_before = parent.is_cancelled();
let sibling_b_before = child_b.is_cancelled();
let sibling_c_before = child_c.is_cancelled();
child_a.cancel(&CancelReason::user("isolated"), Time::from_millis(3000));
assert_eq!(parent.is_cancelled(), parent_before);
assert_eq!(child_b.is_cancelled(), sibling_b_before);
assert_eq!(child_c.is_cancelled(), sibling_c_before);
assert!(child_a.is_cancelled());
assert!(!parent.is_cancelled());
assert!(!child_b.is_cancelled());
assert!(!child_c.is_cancelled());
}
#[test]
fn meta_sibling_subtrees_are_isolated_from_local_parent_cancel() {
let mut rng = DetRng::new(22_223);
let root = SymbolCancelToken::new(ObjectId::new_for_test(31), &mut rng);
let branch_a = root.child(&mut rng);
let branch_b = root.child(&mut rng);
let leaf_a = branch_a.child(&mut rng);
let leaf_b = branch_b.child(&mut rng);
let now = Time::from_millis(3_100);
branch_a.cancel(&CancelReason::user("branch_a_only"), now);
assert!(
branch_a.is_cancelled(),
"the locally cancelled subtree root must be cancelled"
);
assert!(
leaf_a.is_cancelled(),
"descendants of the locally cancelled subtree must cascade"
);
assert!(
!root.is_cancelled(),
"local subtree cancellation must not bubble up to the shared root"
);
assert!(
!branch_b.is_cancelled(),
"sibling subtree root must remain untouched"
);
assert!(
!leaf_b.is_cancelled(),
"sibling subtree descendants must remain untouched"
);
assert_eq!(branch_a.reason().unwrap().kind, CancelKind::User);
assert_eq!(leaf_a.reason().unwrap().kind, CancelKind::ParentCancelled);
assert!(branch_b.reason().is_none());
assert!(leaf_b.reason().is_none());
}
#[test]
fn meta_listener_multiplicativity() {
use std::sync::atomic::{AtomicU32, Ordering};
let mut rng = DetRng::new(33333);
let token = SymbolCancelToken::new(ObjectId::new_for_test(40), &mut rng);
let notification_count = Arc::new(AtomicU32::new(0));
let listener_count = 5u32;
for _ in 0..listener_count {
let count_clone = notification_count.clone();
token.add_listener(move |_: &CancelReason, _: Time| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
}
token.cancel(&CancelReason::timeout(), Time::from_millis(4000));
assert_eq!(notification_count.load(Ordering::SeqCst), listener_count);
let before_second = notification_count.load(Ordering::SeqCst);
token.cancel(
&CancelReason::new(CancelKind::Shutdown),
Time::from_millis(5000),
);
let after_second = notification_count.load(Ordering::SeqCst);
assert_eq!(before_second, listener_count);
assert_eq!(after_second, listener_count * 2);
token.cancel(
&CancelReason::new(CancelKind::Shutdown),
Time::from_millis(6000),
);
assert_eq!(notification_count.load(Ordering::SeqCst), after_second);
}
#[test]
fn meta_broadcast_deduplication_invariant() {
let broadcaster = CancelBroadcaster::new(NullSink);
let msg = CancelMessage::new(
12345,
ObjectId::new_for_test(50),
CancelKind::Timeout,
Time::from_millis(6000),
777,
);
let now = Time::from_millis(6000);
let results: Vec<_> = (0..5)
.map(|_| broadcaster.receive_message(&msg, now))
.collect();
assert!(results[0].is_some(), "first message should be processed");
assert!(
results[1..].iter().all(|r| r.is_none()),
"subsequent messages should be duplicates"
);
let metrics = broadcaster.metrics();
assert_eq!(
metrics.received, 1,
"only one message should be counted as received"
);
assert_eq!(metrics.duplicates, 4, "four duplicates should be detected");
}
#[test]
fn meta_cascade_depth_invariance() {
let mut rng = DetRng::new(44444);
let flat_root = SymbolCancelToken::new(ObjectId::new_for_test(60), &mut rng);
let flat_children: Vec<_> = (0..3).map(|_| flat_root.child(&mut rng)).collect();
let mut rng2 = DetRng::new(44444); let nested_root = SymbolCancelToken::new(ObjectId::new_for_test(60), &mut rng2);
let nested_l1 = nested_root.child(&mut rng2);
let nested_l2 = nested_l1.child(&mut rng2);
let nested_l3 = nested_l2.child(&mut rng2);
let now = Time::from_millis(7000);
flat_root.cancel(&CancelReason::new(CancelKind::Deadline), now);
nested_root.cancel(&CancelReason::new(CancelKind::Deadline), now);
assert!(flat_root.is_cancelled());
assert!(nested_root.is_cancelled());
assert!(flat_children.iter().all(|child| child.is_cancelled()));
assert!(nested_l1.is_cancelled());
assert!(nested_l2.is_cancelled());
assert!(nested_l3.is_cancelled());
assert!(
flat_children
.iter()
.all(|child| child.reason().unwrap().kind == CancelKind::ParentCancelled)
);
assert_eq!(
nested_l1.reason().unwrap().kind,
CancelKind::ParentCancelled
);
assert_eq!(
nested_l2.reason().unwrap().kind,
CancelKind::ParentCancelled
);
assert_eq!(
nested_l3.reason().unwrap().kind,
CancelKind::ParentCancelled
);
}
#[test]
fn meta_seeded_cascade_order_is_deterministic() {
let mut rng_a = DetRng::new(44_445);
let root_a = SymbolCancelToken::new(ObjectId::new_for_test(61), &mut rng_a);
let left_a = root_a.child(&mut rng_a);
let right_a = root_a.child(&mut rng_a);
let left_leaf_a = left_a.child(&mut rng_a);
let right_leaf_a = right_a.child(&mut rng_a);
let mut rng_b = DetRng::new(44_445);
let root_b = SymbolCancelToken::new(ObjectId::new_for_test(61), &mut rng_b);
let left_b = root_b.child(&mut rng_b);
let right_b = root_b.child(&mut rng_b);
let left_leaf_b = left_b.child(&mut rng_b);
let right_leaf_b = right_b.child(&mut rng_b);
let order_a = Arc::new(StdMutex::new(Vec::new()));
for token in [&root_a, &left_a, &right_a, &left_leaf_a, &right_leaf_a] {
attach_order_listener(token, &order_a);
}
let order_b = Arc::new(StdMutex::new(Vec::new()));
for token in [&root_b, &left_b, &right_b, &left_leaf_b, &right_leaf_b] {
attach_order_listener(token, &order_b);
}
let now = Time::from_millis(7_100);
let reason = CancelReason::new(CancelKind::Deadline);
root_a.cancel(&reason, now);
root_b.cancel(&reason, now);
let order_a = order_a.lock().unwrap().clone();
let order_b = order_b.lock().unwrap().clone();
assert_eq!(
order_a, order_b,
"identical seeded cancellation trees must drain in the same observable order"
);
assert_eq!(
order_a,
vec![
root_a.token_id(),
left_a.token_id(),
left_leaf_a.token_id(),
right_a.token_id(),
right_leaf_a.token_id(),
],
"seeded drain order should follow deterministic parent-before-child traversal"
);
}
#[test]
fn meta_cleanup_independence_property() {
let coordinator = CleanupCoordinator::new();
let now = Time::from_millis(8000);
let obj1 = ObjectId::new_for_test(70);
let obj2 = ObjectId::new_for_test(71);
coordinator.register_handler(obj1, CountingCleanupHandler);
for i in 0..3 {
coordinator.register_pending(obj1, Symbol::new_for_test(70, 0, i, &[1, 2]), now);
}
for i in 0..2 {
coordinator.register_pending(obj2, Symbol::new_for_test(71, 0, i, &[3, 4, 5]), now);
}
let coord1 = CleanupCoordinator::new();
let coord2 = CleanupCoordinator::new();
coord1.register_handler(obj1, CountingCleanupHandler);
for i in 0..3 {
coord1.register_pending(obj1, Symbol::new_for_test(70, 0, i, &[1, 2]), now);
}
for i in 0..2 {
coord2.register_pending(obj2, Symbol::new_for_test(71, 0, i, &[3, 4, 5]), now);
}
let combined_result1 = coordinator.cleanup(obj1, None);
let independent_result1 = coord1.cleanup(obj1, None);
assert_eq!(
combined_result1.symbols_cleaned,
independent_result1.symbols_cleaned
);
assert_eq!(
combined_result1.bytes_freed,
independent_result1.bytes_freed
);
assert_eq!(combined_result1.completed, independent_result1.completed);
let stats_after = coordinator.stats();
assert_eq!(stats_after.pending_objects, 1); assert_eq!(stats_after.pending_symbols, 2); }
#[test]
fn cancel_broadcast_metrics_debug_clone_default() {
let m = CancelBroadcastMetrics::default();
let dbg = format!("{m:?}");
assert!(dbg.contains("CancelBroadcastMetrics"), "{dbg}");
let cloned = m;
assert_eq!(cloned.initiated, 0);
}
#[test]
fn cleanup_stats_debug_clone_default() {
let s = CleanupStats::default();
let dbg = format!("{s:?}");
assert!(dbg.contains("CleanupStats"), "{dbg}");
let cloned = s;
assert_eq!(cloned.pending_objects, 0);
}
#[test]
fn cleanup_result_debug_clone() {
let r = CleanupResult {
object_id: ObjectId::new_for_test(1),
symbols_cleaned: 5,
bytes_freed: 1024,
within_budget: true,
completed: true,
handlers_run: vec!["h1".to_string()],
handler_errors: Vec::new(),
};
let dbg = format!("{r:?}");
assert!(dbg.contains("CleanupResult"), "{dbg}");
let cloned = r;
assert_eq!(cloned.symbols_cleaned, 5);
assert!(cloned.completed);
}
#[test]
fn cancel_strengthen_re_notifies_listeners_with_stronger_reason() {
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
let mut rng = DetRng::new(0x_face_d00d);
let token = SymbolCancelToken::new(ObjectId::new_for_test(7), &mut rng);
let observed: Arc<StdMutex<Vec<(crate::types::CancelKind, Time)>>> =
Arc::new(StdMutex::new(Vec::new()));
{
let observed = Arc::clone(&observed);
token.add_listener(move |reason: &CancelReason, at: Time| {
observed.lock().unwrap().push((reason.kind, at));
});
}
let weak = CancelReason::new(crate::types::CancelKind::User);
token.cancel(&weak, Time::from_nanos(100));
token.cancel(&weak, Time::from_nanos(150));
let strong = CancelReason::new(crate::types::CancelKind::Shutdown);
token.cancel(&strong, Time::from_nanos(200));
let log = observed.lock().unwrap().clone();
assert!(
log.len() >= 2,
"listener must observe both the initial cancel and the strengthen, got {log:?}"
);
assert_eq!(
log.first().map(|(kind, _)| *kind),
Some(crate::types::CancelKind::User),
"first notification must carry the initial weak reason, got {log:?}"
);
assert!(
log.iter()
.any(|(kind, at)| *kind == crate::types::CancelKind::Shutdown
&& *at == Time::from_nanos(100)),
"listener must be re-notified with the strengthened reason, got {log:?}"
);
let user_count = log
.iter()
.filter(|(kind, _)| *kind == crate::types::CancelKind::User)
.count();
assert_eq!(
user_count, 1,
"same-severity cancel must not re-fire listeners, got {log:?}"
);
assert!(
log.iter().all(|(_, at)| *at == Time::from_nanos(100)),
"all retained-listener notifications must use the canonical first-cancel timestamp, got {log:?}"
);
}
#[test]
fn add_listener_post_cancel_uses_real_reason_not_fabricated_user() {
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
let mut rng = DetRng::new(0x_dead_beef);
let token = SymbolCancelToken::new(ObjectId::new_for_test(11), &mut rng);
let timeout = CancelReason::new(crate::types::CancelKind::Timeout);
token.cancel(&timeout, Time::from_nanos(42));
let observed: Arc<StdMutex<Vec<(crate::types::CancelKind, u64)>>> =
Arc::new(StdMutex::new(Vec::new()));
{
let observed = Arc::clone(&observed);
token.add_listener(move |reason: &CancelReason, at: Time| {
observed.lock().unwrap().push((reason.kind, at.as_nanos()));
});
}
let log = observed.lock().unwrap().clone();
assert_eq!(
log.len(),
1,
"post-cancel add_listener must fire exactly once, got {log:?}"
);
let (kind, at_nanos) = log[0];
assert_eq!(
kind,
crate::types::CancelKind::Timeout,
"listener must observe the real reason (Timeout), \
not the fabricated CancelKind::User"
);
assert_eq!(
at_nanos, 42,
"listener must observe the real cancelled_at time, not Time::ZERO"
);
}
#[test]
fn cleanup_with_pending_but_no_handler_surfaces_typed_error() {
let coord = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(99);
let now = Time::from_nanos(0);
coord.register_pending(
object_id,
Symbol::new_for_test(99, 0, 0, &[1, 2, 3, 4]),
now,
);
coord.register_pending(
object_id,
Symbol::new_for_test(99, 0, 1, &[5, 6, 7, 8]),
now,
);
coord.register_pending(
object_id,
Symbol::new_for_test(99, 0, 2, &[9, 10, 11, 12]),
now,
);
let result = coord.cleanup(object_id, None);
assert_eq!(
result.symbols_cleaned, 0,
"no-handler outcome must not claim symbols cleaned, got {result:?}"
);
assert_eq!(
result.bytes_freed, 0,
"no-handler outcome must not claim bytes freed, got {result:?}"
);
assert!(
!result.completed,
"no-handler outcome must mark completed=false, got {result:?}"
);
assert!(
result
.handler_errors
.iter()
.any(|e| e.contains("no cleanup handler")),
"missing-handler condition must surface as a typed error, got {:?}",
result.handler_errors
);
let stats = coord.stats();
assert_eq!(
stats.pending_objects, 1,
"pending set must be restored for retry, got {stats:?}"
);
assert!(
!coord.completed.read().contains(&object_id),
"object_id must NOT be in completed set after no-handler outcome"
);
}
#[test]
fn cancel_listener_panic_logged_via_counter() {
struct PanickingListener;
impl CancelListener for PanickingListener {
fn on_cancel(&self, _reason: &CancelReason, _at: Time) {
panic!("simulated listener panic (mzamuo)");
}
}
let mut rng = DetRng::new(0xc0ffee);
let token = SymbolCancelToken::new(ObjectId::new(1, 1), &mut rng);
token.add_listener(PanickingListener);
let reason = CancelReason::new(CancelKind::User);
token.cancel(&reason, Time::from_nanos(100));
assert!(
token.listener_panic_count() >= 1,
"expected listener_panic_count >= 1, got {}",
token.listener_panic_count()
);
let stronger = CancelReason::new(CancelKind::Shutdown);
token.cancel(&stronger, Time::from_nanos(200));
assert!(
token.listener_panic_count() >= 2,
"strengthen path must also count panics, got {}",
token.listener_panic_count()
);
}
#[test]
fn late_add_listener_drop_panic_logged_via_counter() {
struct DropPanickingListener;
impl CancelListener for DropPanickingListener {
fn on_cancel(&self, _reason: &CancelReason, _at: Time) {}
}
impl Drop for DropPanickingListener {
fn drop(&mut self) {
panic!("simulated late-add drop panic (mzamuo)"); }
}
let mut rng = DetRng::new(0xd00d);
let token = SymbolCancelToken::new(ObjectId::new(4, 4), &mut rng);
token.cancel(&CancelReason::new(CancelKind::User), Time::from_nanos(1));
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
token.add_listener(DropPanickingListener);
}));
assert!(
result.is_ok(),
"late-add path must not propagate listener drop panic"
);
assert_eq!(token.listener_panic_count(), 1);
}
#[test]
fn mark_seen_never_exceeds_max_seen_transiently() {
let mut rng = DetRng::new(0xbeef);
let token = SymbolCancelToken::new(ObjectId::new(7, 7), &mut rng);
let coord = CancelMarkSeenHarness {
seen_sequences: parking_lot::RwLock::new(SeenSequences::default()),
max_seen: 5,
};
for i in 0..15u64 {
coord.mark_seen(token.object_id(), token.token_id(), i);
let len = coord.seen_sequences.read().set.len();
assert!(
len <= coord.max_seen,
"seen.set.len()={len} exceeded max_seen={} after insert {i}",
coord.max_seen
);
}
}
struct CancelMarkSeenHarness {
seen_sequences: parking_lot::RwLock<SeenSequences>,
max_seen: usize,
}
impl CancelMarkSeenHarness {
fn mark_seen(&self, object_id: ObjectId, token_id: u64, sequence: u64) {
let mut seen = self.seen_sequences.write();
if seen.set.contains(&(object_id, token_id, sequence)) {
return;
}
while seen.set.len() >= self.max_seen {
if seen.remove_oldest().is_none() {
break;
}
}
seen.insert((object_id, token_id, sequence));
}
}
#[test]
fn child_inherits_parent_cancelled_at_atomically() {
let mut rng = DetRng::new(0x1234);
let parent = SymbolCancelToken::new(ObjectId::new(2, 2), &mut rng);
let cancel_time = Time::from_nanos(500);
let reason = CancelReason::new(CancelKind::User);
parent.cancel(&reason, cancel_time);
let child = parent.child(&mut rng);
assert!(child.is_cancelled());
assert_eq!(
child.cancelled_at().map(Time::as_nanos),
Some(cancel_time.as_nanos()),
"child must inherit the cancelled_at the parent had \
at the moment of the is_cancelled check (snapshot under lock)"
);
}
#[test]
fn child_waits_for_inflight_cancelled_at_publication() {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
let mut rng = DetRng::new(0x5678);
let parent = SymbolCancelToken::new(ObjectId::new(3, 3), &mut rng);
let cancel_time = Time::from_nanos(777);
let started = Arc::new(AtomicBool::new(false));
let mut reason_guard = parent.state.reason.write();
*reason_guard = Some(CancelReason::new(CancelKind::User));
parent.state.cancelled.store(true, Ordering::Release);
parent.state.cancelled_at.store(u64::MAX, Ordering::Release);
let parent_for_child = parent.clone();
let started_for_child = started.clone();
let join = std::thread::spawn(move || {
started_for_child.store(true, Ordering::Release);
let mut child_rng = DetRng::new(0x9abc);
let child = parent_for_child.child(&mut child_rng);
child.cancelled_at().map(Time::as_nanos)
});
const MAX_WAIT_RETRIES: u32 = 10000;
for _attempt in 0..MAX_WAIT_RETRIES {
if started.load(Ordering::Acquire) {
break;
}
std::thread::sleep(std::time::Duration::from_nanos(100));
}
assert!(
started.load(Ordering::Acquire),
"Test thread failed to start within timeout"
);
parent
.state
.cancelled_at
.store(cancel_time.as_nanos(), Ordering::Release);
drop(reason_guard);
let child_cancelled_at = join.join().expect("child thread must complete");
assert_eq!(child_cancelled_at, Some(cancel_time.as_nanos()));
}
#[test]
fn child_wait_for_cancelled_at_does_not_hold_children_lock() {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
let mut rng = DetRng::new(0x53A9_0001);
let parent = SymbolCancelToken::new(ObjectId::new(4, 4), &mut rng);
let started = Arc::new(AtomicBool::new(false));
let mut reason_guard = parent.state.reason.write();
*reason_guard = Some(CancelReason::new(CancelKind::User));
parent.state.cancelled.store(true, Ordering::Release);
parent.state.cancelled_at.store(u64::MAX, Ordering::Release);
let parent_for_child = parent.clone();
let started_for_child = Arc::clone(&started);
let join = std::thread::spawn(move || {
started_for_child.store(true, Ordering::Release);
let mut child_rng = DetRng::new(0x53A9_0002);
let child = parent_for_child.child(&mut child_rng);
child.cancelled_at().map(Time::as_nanos)
});
const MAX_WAIT_RETRIES: u32 = 10_000;
for _attempt in 0..MAX_WAIT_RETRIES {
if started.load(Ordering::Acquire) {
break;
}
std::thread::sleep(std::time::Duration::from_nanos(100));
}
assert!(
started.load(Ordering::Acquire),
"child thread failed to start within timeout"
);
std::thread::sleep(std::time::Duration::from_millis(1));
assert!(
parent.state.children.try_write().is_some(),
"late child creation must not hold children.write() while waiting for cancelled_at"
);
let cancel_time = Time::from_nanos(991);
parent
.state
.cancelled_at
.store(cancel_time.as_nanos(), Ordering::Release);
drop(reason_guard);
let child_cancelled_at = join.join().expect("child thread must complete");
assert_eq!(child_cancelled_at, Some(cancel_time.as_nanos()));
}
#[test]
fn notify_listeners_bounded_iteration_prevents_livelock() {
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU32, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};
let mut rng = DetRng::new(0x4321);
let token = SymbolCancelToken::new(ObjectId::new(42, 0), &mut rng);
let notification_count = Arc::new(AtomicU32::new(0));
for _i in 0..5 {
let count = Arc::clone(¬ification_count);
token.add_listener(move |_reason: &CancelReason, _time: Time| {
count.fetch_add(1, Ordering::Relaxed);
std::hint::spin_loop();
});
}
let initial_time = Time::from_nanos(1000);
token.cancel(&CancelReason::new(CancelKind::Timeout), initial_time);
let completed = Arc::new(AtomicBool::new(false));
let completed_for_thread = Arc::clone(&completed);
let token_for_strengthener = token.clone();
let strengthener_thread = thread::spawn(move || {
for severity in [
CancelKind::Deadline,
CancelKind::Shutdown,
CancelKind::FailFast,
]
.iter()
{
thread::sleep(Duration::from_millis(1));
token_for_strengthener.cancel(&CancelReason::new(*severity), initial_time);
}
});
let start = Instant::now();
let token_for_notify = token.clone();
let notification_thread = thread::spawn(move || {
token_for_notify.cancel(&CancelReason::new(CancelKind::User), initial_time);
completed_for_thread.store(true, Ordering::Release);
});
strengthener_thread
.join()
.expect("strengthener thread should complete");
notification_thread
.join()
.expect("notification thread should complete");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"Notification should complete quickly, took {:?}",
elapsed
);
assert!(
completed.load(Ordering::Acquire),
"Notification process should have completed"
);
let final_count = notification_count.load(Ordering::Relaxed);
assert!(
final_count > 0,
"Listeners should have been notified, count: {}",
final_count
);
}
#[test]
fn cancelled_at_snapshot_for_child_livelock_regression() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
let mut rng = DetRng::new(0x1234_5678);
let object_id = ObjectId::new_for_test(0x1234_5678);
let token = SymbolCancelToken::new(object_id, &mut rng);
let barrier = Arc::new(Barrier::new(2));
let cancel_started = Arc::new(AtomicBool::new(false));
let child_created = Arc::new(AtomicBool::new(false));
let token_for_cancel = token.clone();
let barrier_for_cancel = Arc::clone(&barrier);
let cancel_started_for_cancel = Arc::clone(&cancel_started);
let cancel_thread = thread::spawn(move || {
barrier_for_cancel.wait();
let reason = CancelReason::user("livelock test");
let mut reason_guard = token_for_cancel.state.reason.write();
token_for_cancel
.state
.cancelled
.store(true, Ordering::Release);
cancel_started_for_cancel.store(true, Ordering::Release);
thread::sleep(Duration::from_millis(10));
token_for_cancel.state.cancelled_at.store(
crate::types::Time::from_millis(12345).as_nanos(),
Ordering::Release,
);
*reason_guard = Some(reason);
});
let token_for_child = token.clone();
let barrier_for_child = Arc::clone(&barrier);
let cancel_started_for_child = Arc::clone(&cancel_started);
let child_created_for_child = Arc::clone(&child_created);
let child_thread = thread::spawn(move || {
barrier_for_child.wait();
while !cancel_started_for_child.load(Ordering::Acquire) {
thread::sleep(Duration::from_nanos(100));
}
let start = Instant::now();
let mut child_rng = DetRng::new(0x8765_4321);
let child = token_for_child.child(&mut child_rng);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"Child creation should not livelock, took {:?}",
elapsed
);
child_created_for_child.store(true, Ordering::Release);
child
});
let start = Instant::now();
cancel_thread.join().expect("Cancel thread should complete");
let child = child_thread.join().expect("Child thread should complete");
let total_elapsed = start.elapsed();
assert!(
total_elapsed < Duration::from_secs(1),
"Test should complete quickly, took {:?}",
total_elapsed
);
assert!(
cancel_started.load(Ordering::Acquire),
"Cancel should have started"
);
assert!(
child_created.load(Ordering::Acquire),
"Child should have been created without livelock"
);
assert!(token.is_cancelled(), "Token should be cancelled");
assert!(
token.cancelled_at().is_some(),
"Cancelled timestamp should be available"
);
assert_eq!(
token.reason(),
Some(CancelReason::user("livelock test")),
"manual race setup must still publish a real final cancel reason"
);
assert_eq!(
child.cancelled_at(),
Some(crate::types::Time::from_millis(12345)),
"child created during in-flight cancel must inherit the canonical parent timestamp"
);
}
#[test]
fn cleanup_coordinator_buffers_symbols_during_retry() {
use std::sync::Arc;
use std::sync::Mutex;
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(42);
let now = Time::from_nanos(1000);
coordinator.register_pending(object_id, Symbol::new_for_test(42, 0, 0, b"initial1"), now);
coordinator.register_pending(object_id, Symbol::new_for_test(42, 0, 1, b"initial2"), now);
#[derive(Debug)]
struct FailingHandler {
attempts: Arc<Mutex<u32>>,
}
impl CleanupHandler for FailingHandler {
fn name(&self) -> &'static str {
"failing_test_handler"
}
fn cleanup(
&self,
_object_id: ObjectId,
symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
let mut attempts = self.attempts.lock().unwrap();
*attempts += 1;
if *attempts == 1 {
Err(
crate::error::Error::new(crate::error::ErrorKind::ConnectionLost)
.with_message("simulated failure"),
)
} else {
assert_eq!(symbols.len(), 4, "Should have initial + buffered symbols");
let data: Vec<&[u8]> = symbols.iter().map(Symbol::data).collect();
assert!(data.iter().any(|payload| *payload == b"initial1"));
assert!(data.iter().any(|payload| *payload == b"initial2"));
assert!(data.iter().any(|payload| *payload == b"during_cleanup1"));
assert!(data.iter().any(|payload| *payload == b"during_cleanup2"));
Ok(4) }
}
}
let attempts = Arc::new(Mutex::new(0u32));
let handler = FailingHandler {
attempts: Arc::clone(&attempts),
};
coordinator.register_handler(object_id, handler);
let result1 = coordinator.cleanup(object_id, None);
assert!(
!result1.completed,
"First cleanup should fail and not complete"
);
assert!(
!result1.handler_errors.is_empty(),
"Should have handler error"
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(42, 0, 2, b"during_cleanup1"),
now,
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(42, 0, 3, b"during_cleanup2"),
now,
);
let stats = coordinator.stats();
assert_eq!(
stats.pending_objects, 1,
"Should have pending object after failure"
);
let result2 = coordinator.cleanup(object_id, None);
assert!(result2.completed, "Second cleanup should succeed");
assert!(
result2.handler_errors.is_empty(),
"Should have no handler errors"
);
assert_eq!(
result2.symbols_cleaned, 4,
"Should clean initial + buffered symbols"
);
let final_stats = coordinator.stats();
assert_eq!(
final_stats.pending_objects, 0,
"Should have no pending objects"
);
assert_eq!(
final_stats.pending_symbols, 0,
"Should have no pending symbols"
);
assert_eq!(
*attempts.lock().unwrap(),
2,
"Handler should be called twice"
);
}
#[test]
fn cleanup_coordinator_no_symbols_lost_during_concurrent_registration() {
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(99);
let now = Time::from_nanos(2000);
coordinator.register_pending(object_id, Symbol::new_for_test(99, 0, 0, b"original"), now);
#[derive(Debug)]
struct AlwaysFailHandler;
impl CleanupHandler for AlwaysFailHandler {
fn name(&self) -> &'static str {
"always_fail"
}
fn cleanup(&self, _: ObjectId, _: Vec<Symbol>) -> crate::error::Result<usize> {
Err(crate::error::Error::new(crate::error::ErrorKind::Internal)
.with_message("always fails"))
}
}
coordinator.register_handler(object_id, AlwaysFailHandler);
let result = coordinator.cleanup(object_id, None);
assert!(!result.completed);
coordinator.register_pending(
object_id,
Symbol::new_for_test(99, 0, 1, b"during_retry1"),
now,
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(99, 0, 2, b"during_retry2"),
now,
);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 1);
assert!(
stats.pending_symbols >= 3,
"All symbols should be preserved, got {}",
stats.pending_symbols
);
coordinator.register_pending(
object_id,
Symbol::new_for_test(99, 0, 3, b"after_retry"),
now,
);
let final_stats = coordinator.stats();
assert!(
final_stats.pending_symbols >= 4,
"All symbols including post-retry should be preserved"
);
}
#[test]
fn cleanup_reentrant_attempt_is_rejected_without_stealing_retry_state() {
use std::sync::{Arc, Mutex};
struct ReentrantHandler {
coordinator: Arc<CleanupCoordinator>,
nested_result: Arc<Mutex<Option<CleanupResult>>>,
}
impl CleanupHandler for ReentrantHandler {
fn name(&self) -> &'static str {
"reentrant"
}
fn cleanup(
&self,
object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
self.coordinator.register_pending(
object_id,
Symbol::new_for_test(123, 0, 1, b"late-symbol"),
Time::from_millis(101),
);
let nested = self.coordinator.cleanup(object_id, None);
*self.nested_result.lock().unwrap() = Some(nested);
Ok(1)
}
}
let coordinator = Arc::new(CleanupCoordinator::new());
let nested_result = Arc::new(Mutex::new(None));
let object_id = ObjectId::new_for_test(123);
coordinator.register_pending(
object_id,
Symbol::new_for_test(123, 0, 0, b"initial"),
Time::from_millis(100),
);
coordinator.register_handler(
object_id,
ReentrantHandler {
coordinator: Arc::clone(&coordinator),
nested_result: Arc::clone(&nested_result),
},
);
let outer = coordinator.cleanup(object_id, None);
assert!(outer.completed, "outer cleanup should still complete");
let nested = nested_result
.lock()
.unwrap()
.clone()
.expect("nested cleanup result should be recorded");
assert!(
!nested.completed,
"reentrant cleanup attempt must fail closed"
);
assert_eq!(nested.symbols_cleaned, 0);
assert_eq!(nested.bytes_freed, 0);
assert!(
nested
.handler_errors
.iter()
.any(|err| err.contains("cleanup already in progress")),
"expected reentrant cleanup error, got {:?}",
nested.handler_errors
);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 0);
assert_eq!(stats.pending_symbols, 0);
assert_eq!(stats.pending_bytes, 0);
assert!(coordinator.completed.read().contains(&object_id));
}
#[test]
fn cleanup_completed_path_scrubs_reentrant_handler_re_registration() {
use std::sync::Arc;
struct ReRegisteringHandler {
coordinator: Arc<CleanupCoordinator>,
}
impl CleanupHandler for ReRegisteringHandler {
fn name(&self) -> &'static str {
"re-registering"
}
fn cleanup(
&self,
object_id: ObjectId,
_symbols: Vec<Symbol>,
) -> crate::error::Result<usize> {
self.coordinator
.register_handler(object_id, CountingCleanupHandler);
Ok(1)
}
}
let coordinator = Arc::new(CleanupCoordinator::new());
let object_id = ObjectId::new_for_test(1234);
coordinator.register_pending(
object_id,
Symbol::new_for_test(1234, 0, 0, b"initial"),
Time::from_millis(200),
);
coordinator.register_handler(
object_id,
ReRegisteringHandler {
coordinator: Arc::clone(&coordinator),
},
);
let result = coordinator.cleanup(object_id, None);
assert!(result.completed, "cleanup should still complete");
assert_eq!(result.symbols_cleaned, 1);
assert_eq!(result.bytes_freed, b"initial".len());
assert!(
!coordinator.handlers.read().contains_key(&object_id),
"completed cleanup must scrub handlers re-registered during the callback"
);
assert!(coordinator.completed.read().contains(&object_id));
}
#[test]
fn cleanup_buffered_only_reopen_restores_handler_for_retry() {
let coordinator = CleanupCoordinator::new();
let object_id = ObjectId::new_for_test(124);
coordinator.register_handler(object_id, CountingCleanupHandler);
coordinator.cleanup_buffer.write().insert(
object_id,
vec![Symbol::new_for_test(124, 0, 0, b"buffered-only")],
);
let first = coordinator.cleanup(object_id, None);
assert!(
!first.completed,
"buffered symbols arriving during an otherwise empty cleanup must reopen retry state"
);
assert!(
coordinator.handlers.read().contains_key(&object_id),
"buffered-only reopen must restore the per-object handler"
);
let stats = coordinator.stats();
assert_eq!(stats.pending_objects, 1);
assert_eq!(stats.pending_symbols, 1);
assert_eq!(stats.pending_bytes, b"buffered-only".len());
let second = coordinator.cleanup(object_id, None);
assert!(
second.completed,
"restored handler should allow retry to finish"
);
assert_eq!(second.symbols_cleaned, 1);
assert_eq!(second.bytes_freed, b"buffered-only".len());
}
#[test]
fn cancel_broadcaster_tracks_pending_retries_in_metrics() {
#[derive(Debug)]
struct TestSink;
impl CancelSink for TestSink {
fn send_to(
&self,
_peer: &PeerId,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<()>> + Send {
std::future::ready(Ok(()))
}
fn broadcast(
&self,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<usize>> + Send {
std::future::ready(Ok(1))
}
}
let broadcaster = CancelBroadcaster::new(TestSink);
let object_id = ObjectId::new_for_test(123);
let initial_metrics = broadcaster.metrics();
assert_eq!(
initial_metrics.pending_retries, 0,
"Should start with no pending retries"
);
let test_message =
CancelMessage::new(42, object_id, CancelKind::User, Time::from_nanos(1000), 1);
broadcaster.pending_retries.write().push_back(test_message);
let metrics_with_pending = broadcaster.metrics();
assert_eq!(
metrics_with_pending.pending_retries, 1,
"Should show 1 pending retry"
);
broadcaster.pending_retries.write().clear();
let final_metrics = broadcaster.metrics();
assert_eq!(
final_metrics.pending_retries, 0,
"Should show no pending retries after clear"
);
}
#[test]
fn cancel_broadcaster_serializes_concurrent_retry_passes() {
use std::sync::Condvar;
use std::sync::mpsc;
use std::time::Duration;
#[derive(Debug)]
struct BlockingFirstRetrySink {
broadcast_calls: Arc<AtomicUsize>,
first_call_entered: std::sync::Mutex<Option<mpsc::Sender<()>>>,
release_gate: Arc<(std::sync::Mutex<bool>, Condvar)>,
}
impl CancelSink for BlockingFirstRetrySink {
fn send_to(
&self,
_peer: &PeerId,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<()>> + Send {
std::future::ready(Ok(()))
}
fn broadcast(
&self,
_msg: &CancelMessage,
) -> impl std::future::Future<Output = crate::error::Result<usize>> + Send {
let call_index = self.broadcast_calls.fetch_add(1, Ordering::SeqCst);
let entered = if call_index == 0 {
self.first_call_entered.lock().unwrap().take()
} else {
None
};
let release_gate = Arc::clone(&self.release_gate);
async move {
if let Some(entered) = entered {
entered.send(()).expect("first retry should signal entry");
let (released_lock, released_cv) = &*release_gate;
let mut released = released_lock.lock().unwrap();
while !*released {
released = released_cv.wait(released).unwrap();
}
}
Ok(1)
}
}
}
let (entered_tx, entered_rx) = mpsc::channel();
let release_gate = Arc::new((std::sync::Mutex::new(false), Condvar::new()));
let broadcast_calls = Arc::new(AtomicUsize::new(0));
let sink = BlockingFirstRetrySink {
broadcast_calls: Arc::clone(&broadcast_calls),
first_call_entered: std::sync::Mutex::new(Some(entered_tx)),
release_gate: Arc::clone(&release_gate),
};
let broadcaster = Arc::new(CancelBroadcaster::new(sink));
let object_id = ObjectId::new_for_test(321);
broadcaster
.pending_retries
.write()
.push_back(CancelMessage::new(
1,
object_id,
CancelKind::User,
Time::from_nanos(10),
0,
));
broadcaster
.pending_retries
.write()
.push_back(CancelMessage::new(
1,
object_id,
CancelKind::User,
Time::from_nanos(20),
1,
));
let retry_owner = Arc::clone(&broadcaster);
let retry_handle = std::thread::spawn(move || {
futures_lite::future::block_on(retry_owner.retry_failed_broadcasts())
});
entered_rx
.recv_timeout(Duration::from_secs(1))
.expect("primary retry loop should enter first broadcast");
let concurrent_result =
futures_lite::future::block_on(broadcaster.retry_failed_broadcasts());
assert_eq!(
concurrent_result.0, 0,
"concurrent retry callers must not steal later messages from the FIFO queue"
);
assert!(
concurrent_result.1.is_none(),
"concurrent retry callers must return without surfacing an error: {:?}",
concurrent_result.1
);
assert_eq!(
broadcaster.pending_retries.read().len(),
1,
"the second message should remain queued for the active retry loop"
);
let (released_lock, released_cv) = &*release_gate;
*released_lock.lock().unwrap() = true;
released_cv.notify_all();
let owner_result = retry_handle.join().expect("retry thread should join");
assert_eq!(
owner_result.0, 2,
"the owning retry pass should drain both queued messages in order"
);
assert!(
owner_result.1.is_none(),
"the owning retry pass should complete without surfacing an error: {:?}",
owner_result.1
);
assert_eq!(
broadcaster.pending_retries.read().len(),
0,
"all retry messages should be drained after the owner completes"
);
assert_eq!(
broadcast_calls.load(Ordering::SeqCst),
2,
"only the owning retry pass should broadcast the queued messages"
);
}
}
#[cfg(test)]
#[path = "symbol_cancel_metamorphic.rs"]
mod symbol_cancel_metamorphic;