use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::types::cancel::CancelReason;
use crate::types::outcome::PanicPayload;
use crate::types::{Outcome, RegionId, TaskId, Time};
static MONITOR_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MonitorRef(u64);
impl MonitorRef {
#[inline]
fn new() -> Self {
Self(MONITOR_COUNTER.fetch_add(1, Ordering::Relaxed))
}
#[cfg(test)]
fn from_raw(id: u64) -> Self {
Self(id)
}
#[doc(hidden)]
#[must_use]
#[inline]
pub const fn new_for_test(id: u64) -> Self {
Self(id)
}
#[must_use]
#[inline]
pub fn id(self) -> u64 {
self.0
}
}
impl std::fmt::Display for MonitorRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MonitorRef({})", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DownReason {
Normal,
Error(String),
Cancelled(CancelReason),
Panicked(PanicPayload),
}
impl DownReason {
#[must_use]
#[inline]
pub fn from_task_outcome(outcome: &Outcome<(), crate::error::Error>) -> Self {
match outcome {
Outcome::Ok(()) => Self::Normal,
Outcome::Err(e) => Self::Error(format!("{e}")),
Outcome::Cancelled(r) => Self::Cancelled(r.clone()),
Outcome::Panicked(p) => Self::Panicked(p.clone()),
}
}
#[must_use]
#[inline]
pub fn is_normal(&self) -> bool {
matches!(self, Self::Normal)
}
#[must_use]
#[inline]
pub fn is_error(&self) -> bool {
matches!(self, Self::Error(_))
}
#[must_use]
#[inline]
pub fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled(_))
}
#[must_use]
#[inline]
pub fn is_panicked(&self) -> bool {
matches!(self, Self::Panicked(_))
}
}
impl std::fmt::Display for DownReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Normal => write!(f, "normal"),
Self::Error(e) => write!(f, "error: {e}"),
Self::Cancelled(r) => write!(f, "cancelled: {r:?}"),
Self::Panicked(p) => write!(f, "panicked: {p}"),
}
}
}
#[derive(Debug, Clone)]
pub struct DownNotification {
pub monitored: TaskId,
pub reason: DownReason,
pub monitor_ref: MonitorRef,
}
#[derive(Debug, Clone)]
struct MonitorRecord {
watcher: TaskId,
watcher_region: RegionId,
monitored: TaskId,
}
#[derive(Debug)]
#[allow(clippy::struct_field_names)]
pub struct MonitorSet {
by_ref: BTreeMap<MonitorRef, MonitorRecord>,
by_monitored: BTreeMap<TaskId, Vec<MonitorRef>>,
by_watcher_region: BTreeMap<RegionId, Vec<MonitorRef>>,
}
impl MonitorSet {
#[must_use]
pub fn new() -> Self {
Self {
by_ref: BTreeMap::new(),
by_monitored: BTreeMap::new(),
by_watcher_region: BTreeMap::new(),
}
}
pub fn establish(
&mut self,
watcher: TaskId,
watcher_region: RegionId,
monitored: TaskId,
) -> MonitorRef {
let monitor_ref = MonitorRef::new();
let record = MonitorRecord {
watcher,
watcher_region,
monitored,
};
self.by_ref.insert(monitor_ref, record);
self.by_monitored
.entry(monitored)
.or_default()
.push(monitor_ref);
self.by_watcher_region
.entry(watcher_region)
.or_default()
.push(monitor_ref);
monitor_ref
}
pub fn demonitor(&mut self, monitor_ref: MonitorRef) -> bool {
let Some(record) = self.by_ref.remove(&monitor_ref) else {
return false;
};
if let Some(refs) = self.by_monitored.get_mut(&record.monitored) {
refs.retain(|r| *r != monitor_ref);
if refs.is_empty() {
self.by_monitored.remove(&record.monitored);
}
}
if let Some(refs) = self.by_watcher_region.get_mut(&record.watcher_region) {
refs.retain(|r| *r != monitor_ref);
if refs.is_empty() {
self.by_watcher_region.remove(&record.watcher_region);
}
}
true
}
#[must_use]
pub fn watchers_of(&self, monitored: TaskId) -> Vec<(MonitorRef, TaskId)> {
let Some(refs) = self.by_monitored.get(&monitored) else {
return Vec::new();
};
refs.iter()
.filter_map(|mref| self.by_ref.get(mref).map(|rec| (*mref, rec.watcher)))
.collect()
}
pub fn remove_monitored(&mut self, monitored: TaskId) -> Vec<MonitorRef> {
let Some(refs) = self.by_monitored.remove(&monitored) else {
return Vec::new();
};
let mut removed = Vec::with_capacity(refs.len());
for mref in refs {
if let Some(record) = self.by_ref.remove(&mref) {
if let Some(region_refs) = self.by_watcher_region.get_mut(&record.watcher_region) {
region_refs.retain(|r| *r != mref);
if region_refs.is_empty() {
self.by_watcher_region.remove(&record.watcher_region);
}
}
removed.push(mref);
}
}
removed
}
pub fn cleanup_region(&mut self, region: RegionId) -> Vec<MonitorRef> {
let Some(refs) = self.by_watcher_region.remove(®ion) else {
return Vec::new();
};
let mut removed = Vec::with_capacity(refs.len());
for mref in refs {
if let Some(record) = self.by_ref.remove(&mref) {
if let Some(monitored_refs) = self.by_monitored.get_mut(&record.monitored) {
monitored_refs.retain(|r| *r != mref);
if monitored_refs.is_empty() {
self.by_monitored.remove(&record.monitored);
}
}
removed.push(mref);
}
}
removed
}
#[must_use]
pub fn len(&self) -> usize {
self.by_ref.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.by_ref.is_empty()
}
#[must_use]
pub fn watcher_of(&self, monitor_ref: MonitorRef) -> Option<TaskId> {
self.by_ref.get(&monitor_ref).map(|r| r.watcher)
}
#[must_use]
pub fn monitored_of(&self, monitor_ref: MonitorRef) -> Option<TaskId> {
self.by_ref.get(&monitor_ref).map(|r| r.monitored)
}
}
impl Default for MonitorSet {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct DownBatch {
entries: Vec<DownBatchEntry>,
}
#[derive(Debug, Clone)]
struct DownBatchEntry {
completion_vt: Time,
notification: DownNotification,
}
impl DownBatch {
#[must_use]
pub fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn push(&mut self, completion_vt: Time, notification: DownNotification) {
self.entries.push(DownBatchEntry {
completion_vt,
notification,
});
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn into_sorted(mut self) -> Vec<DownNotification> {
self.entries.sort_by(|a, b| {
let vt_cmp = a.completion_vt.cmp(&b.completion_vt);
vt_cmp
.then_with(|| a.notification.monitored.cmp(&b.notification.monitored))
.then_with(|| a.notification.monitor_ref.cmp(&b.notification.monitor_ref))
});
self.entries.into_iter().map(|e| e.notification).collect()
}
}
impl Default for DownBatch {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_task_id(index: u32, generation: u32) -> TaskId {
TaskId::new_for_test(index, generation)
}
fn test_region_id(index: u32, generation: u32) -> RegionId {
RegionId::new_for_test(index, generation)
}
#[test]
fn monitor_ref_uniqueness() {
let r1 = MonitorRef::new();
let r2 = MonitorRef::new();
assert_ne!(r1, r2);
assert!(r1 < r2); }
#[test]
fn monitor_ref_display() {
let r = MonitorRef::from_raw(42);
assert_eq!(format!("{r}"), "MonitorRef(42)");
}
#[test]
fn monitor_ref_ordering() {
let r1 = MonitorRef::from_raw(1);
let r2 = MonitorRef::from_raw(2);
let r3 = MonitorRef::from_raw(3);
assert!(r1 < r2);
assert!(r2 < r3);
}
#[test]
fn down_reason_predicates() {
assert!(DownReason::Normal.is_normal());
assert!(!DownReason::Normal.is_error());
assert!(DownReason::Error("oops".into()).is_error());
assert!(!DownReason::Error("oops".into()).is_normal());
assert!(DownReason::Cancelled(CancelReason::default()).is_cancelled());
assert!(DownReason::Panicked(PanicPayload::new("boom")).is_panicked());
}
#[test]
fn down_reason_display() {
assert_eq!(format!("{}", DownReason::Normal), "normal");
assert!(format!("{}", DownReason::Error("fail".into())).contains("fail"));
assert!(format!("{}", DownReason::Panicked(PanicPayload::new("boom"))).contains("boom"));
}
#[test]
fn down_reason_from_task_outcome_ok() {
let outcome: Outcome<(), crate::error::Error> = Outcome::ok(());
let reason = DownReason::from_task_outcome(&outcome);
assert!(reason.is_normal());
}
#[test]
fn down_reason_from_task_outcome_cancelled() {
let outcome: Outcome<(), crate::error::Error> = Outcome::cancelled(CancelReason::default());
let reason = DownReason::from_task_outcome(&outcome);
assert!(reason.is_cancelled());
}
#[test]
fn down_reason_from_task_outcome_panicked() {
let outcome: Outcome<(), crate::error::Error> =
Outcome::panicked(PanicPayload::new("test"));
let reason = DownReason::from_task_outcome(&outcome);
assert!(reason.is_panicked());
}
#[test]
fn establish_creates_monitor() {
let mut set = MonitorSet::new();
let watcher = test_task_id(1, 0);
let region = test_region_id(0, 0);
let target = test_task_id(2, 0);
let mref = set.establish(watcher, region, target);
assert_eq!(set.len(), 1);
assert_eq!(set.watcher_of(mref), Some(watcher));
assert_eq!(set.monitored_of(mref), Some(target));
}
#[test]
fn establish_multiple_monitors_same_target() {
let mut set = MonitorSet::new();
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let region = test_region_id(0, 0);
let target = test_task_id(3, 0);
let m1 = set.establish(w1, region, target);
let m2 = set.establish(w2, region, target);
assert_ne!(m1, m2);
assert_eq!(set.len(), 2);
let watchers = set.watchers_of(target);
assert_eq!(watchers.len(), 2);
}
#[test]
fn establish_same_watcher_twice_yields_distinct_refs() {
let mut set = MonitorSet::new();
let watcher = test_task_id(1, 0);
let region = test_region_id(0, 0);
let target = test_task_id(2, 0);
let m1 = set.establish(watcher, region, target);
let m2 = set.establish(watcher, region, target);
assert_ne!(m1, m2);
assert_eq!(set.len(), 2);
}
#[test]
fn demonitor_removes_monitor() {
let mut set = MonitorSet::new();
let watcher = test_task_id(1, 0);
let region = test_region_id(0, 0);
let target = test_task_id(2, 0);
let mref = set.establish(watcher, region, target);
assert!(set.demonitor(mref));
assert_eq!(set.len(), 0);
assert!(set.watchers_of(target).is_empty());
}
#[test]
fn demonitor_nonexistent_returns_false() {
let mut set = MonitorSet::new();
assert!(!set.demonitor(MonitorRef::from_raw(999)));
}
#[test]
fn demonitor_only_removes_specific_monitor() {
let mut set = MonitorSet::new();
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let region = test_region_id(0, 0);
let target = test_task_id(3, 0);
let m1 = set.establish(w1, region, target);
let _m2 = set.establish(w2, region, target);
set.demonitor(m1);
assert_eq!(set.len(), 1);
assert_eq!(set.watchers_of(target).len(), 1);
}
#[test]
fn watchers_of_empty() {
let set = MonitorSet::new();
assert!(set.watchers_of(test_task_id(99, 0)).is_empty());
}
#[test]
fn watchers_of_returns_all_watchers() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let target = test_task_id(10, 0);
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let w3 = test_task_id(3, 0);
let m1 = set.establish(w1, region, target);
let m2 = set.establish(w2, region, target);
let m3 = set.establish(w3, region, target);
let watchers = set.watchers_of(target);
assert_eq!(watchers.len(), 3);
let mrefs: Vec<MonitorRef> = watchers.iter().map(|(r, _)| *r).collect();
assert!(mrefs.contains(&m1));
assert!(mrefs.contains(&m2));
assert!(mrefs.contains(&m3));
let tids: Vec<TaskId> = watchers.iter().map(|(_, t)| *t).collect();
assert!(tids.contains(&w1));
assert!(tids.contains(&w2));
assert!(tids.contains(&w3));
}
#[test]
fn remove_monitored_clears_all_watchers() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let target = test_task_id(10, 0);
set.establish(test_task_id(1, 0), region, target);
set.establish(test_task_id(2, 0), region, target);
let removed = set.remove_monitored(target);
assert_eq!(removed.len(), 2);
assert!(set.is_empty());
assert!(set.watchers_of(target).is_empty());
}
#[test]
fn remove_monitored_preserves_other_monitors() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let t1 = test_task_id(10, 0);
let t2 = test_task_id(20, 0);
let watcher = test_task_id(1, 0);
set.establish(watcher, region, t1);
set.establish(watcher, region, t2);
set.remove_monitored(t1);
assert_eq!(set.len(), 1);
assert_eq!(set.watchers_of(t2).len(), 1);
}
#[test]
fn cleanup_region_removes_all_monitors_in_region() {
let mut set = MonitorSet::new();
let r1 = test_region_id(1, 0);
let r2 = test_region_id(2, 0);
let target = test_task_id(10, 0);
set.establish(test_task_id(1, 0), r1, target);
set.establish(test_task_id(2, 0), r2, target);
let removed = set.cleanup_region(r1);
assert_eq!(removed.len(), 1);
assert_eq!(set.len(), 1);
assert_eq!(set.watchers_of(target).len(), 1);
}
#[test]
fn cleanup_region_empty_is_noop() {
let mut set = MonitorSet::new();
let removed = set.cleanup_region(test_region_id(99, 0));
assert!(removed.is_empty());
}
#[test]
fn cleanup_region_cleans_monitored_index() {
let mut set = MonitorSet::new();
let region = test_region_id(1, 0);
let target = test_task_id(10, 0);
set.establish(test_task_id(1, 0), region, target);
set.cleanup_region(region);
assert!(set.watchers_of(target).is_empty());
}
#[test]
fn down_batch_empty() {
let batch = DownBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
assert!(batch.into_sorted().is_empty());
}
#[test]
fn down_batch_single_item() {
let mut batch = DownBatch::new();
let notif = DownNotification {
monitored: test_task_id(1, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
};
batch.push(Time::from_nanos(100), notif);
assert_eq!(batch.len(), 1);
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 1);
assert_eq!(sorted[0].monitored, test_task_id(1, 0));
}
#[test]
fn down_batch_sorts_by_virtual_time() {
let mut batch = DownBatch::new();
batch.push(
Time::from_nanos(300),
DownNotification {
monitored: test_task_id(1, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
},
);
batch.push(
Time::from_nanos(100),
DownNotification {
monitored: test_task_id(2, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(2),
},
);
batch.push(
Time::from_nanos(200),
DownNotification {
monitored: test_task_id(3, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(3),
},
);
let sorted = batch.into_sorted();
assert_eq!(sorted[0].monitored, test_task_id(2, 0)); assert_eq!(sorted[1].monitored, test_task_id(3, 0)); assert_eq!(sorted[2].monitored, test_task_id(1, 0)); }
#[test]
fn down_batch_tie_breaks_by_task_id() {
let mut batch = DownBatch::new();
let same_vt = Time::from_nanos(100);
batch.push(
same_vt,
DownNotification {
monitored: test_task_id(5, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
},
);
batch.push(
same_vt,
DownNotification {
monitored: test_task_id(1, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(2),
},
);
batch.push(
same_vt,
DownNotification {
monitored: test_task_id(3, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(3),
},
);
let sorted = batch.into_sorted();
assert_eq!(sorted[0].monitored, test_task_id(1, 0));
assert_eq!(sorted[1].monitored, test_task_id(3, 0));
assert_eq!(sorted[2].monitored, test_task_id(5, 0));
}
#[test]
fn down_batch_tie_breaks_duplicate_target_by_monitor_ref() {
let mut batch = DownBatch::new();
let same_vt = Time::from_nanos(100);
let same_target = test_task_id(7, 0);
batch.push(
same_vt,
DownNotification {
monitored: same_target,
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(3),
},
);
batch.push(
same_vt,
DownNotification {
monitored: same_target,
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
},
);
batch.push(
same_vt,
DownNotification {
monitored: same_target,
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(2),
},
);
let sorted = batch.into_sorted();
let refs: Vec<u64> = sorted.into_iter().map(|n| n.monitor_ref.id()).collect();
assert_eq!(refs, vec![1, 2, 3]);
}
#[test]
fn down_batch_tie_breaks_by_generation_then_slot() {
let mut batch = DownBatch::new();
let same_vt = Time::from_nanos(100);
batch.push(
same_vt,
DownNotification {
monitored: test_task_id(1, 2), reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
},
);
batch.push(
same_vt,
DownNotification {
monitored: test_task_id(2, 1), reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(2),
},
);
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 2);
let first = sorted[0].monitored;
let second = sorted[1].monitored;
assert_ne!(first, second);
}
#[test]
fn down_batch_mixed_vt_and_tid_ordering() {
let mut batch = DownBatch::new();
batch.push(
Time::from_nanos(200),
DownNotification {
monitored: test_task_id(3, 0),
reason: DownReason::Normal,
monitor_ref: MonitorRef::from_raw(1),
},
);
batch.push(
Time::from_nanos(100),
DownNotification {
monitored: test_task_id(5, 0),
reason: DownReason::Error("err".into()),
monitor_ref: MonitorRef::from_raw(2),
},
);
batch.push(
Time::from_nanos(100),
DownNotification {
monitored: test_task_id(2, 0),
reason: DownReason::Cancelled(CancelReason::default()),
monitor_ref: MonitorRef::from_raw(3),
},
);
batch.push(
Time::from_nanos(200),
DownNotification {
monitored: test_task_id(1, 0),
reason: DownReason::Panicked(PanicPayload::new("boom")),
monitor_ref: MonitorRef::from_raw(4),
},
);
let sorted = batch.into_sorted();
assert_eq!(sorted[0].monitored, test_task_id(2, 0));
assert_eq!(sorted[1].monitored, test_task_id(5, 0));
assert_eq!(sorted[2].monitored, test_task_id(1, 0));
assert_eq!(sorted[3].monitored, test_task_id(3, 0));
}
#[test]
fn end_to_end_monitor_to_notification() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let watcher = test_task_id(1, 0);
let target1 = test_task_id(10, 0);
let target2 = test_task_id(20, 0);
let m1 = set.establish(watcher, region, target1);
let m2 = set.establish(watcher, region, target2);
let completion_vt = Time::from_nanos(500);
let mut batch = DownBatch::new();
for (mref, _watcher_tid) in set.watchers_of(target1) {
batch.push(
completion_vt,
DownNotification {
monitored: target1,
reason: DownReason::Normal,
monitor_ref: mref,
},
);
}
for (mref, _watcher_tid) in set.watchers_of(target2) {
batch.push(
completion_vt,
DownNotification {
monitored: target2,
reason: DownReason::Error("fail".into()),
monitor_ref: mref,
},
);
}
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 2);
assert_eq!(sorted[0].monitored, target1);
assert_eq!(sorted[0].monitor_ref, m1);
assert!(sorted[0].reason.is_normal());
assert_eq!(sorted[1].monitored, target2);
assert_eq!(sorted[1].monitor_ref, m2);
assert!(sorted[1].reason.is_error());
set.remove_monitored(target1);
set.remove_monitored(target2);
assert!(set.is_empty());
}
#[test]
fn region_cleanup_prevents_stale_notifications() {
let mut set = MonitorSet::new();
let region = test_region_id(1, 0);
let watcher = test_task_id(1, 0);
let target = test_task_id(10, 0);
set.establish(watcher, region, target);
set.cleanup_region(region);
assert!(set.watchers_of(target).is_empty());
assert!(set.is_empty());
}
#[test]
fn conformance_multiple_watchers_independent_notifications() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let target = test_task_id(100, 0);
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let w3 = test_task_id(3, 0);
let w4 = test_task_id(4, 0);
let m1 = set.establish(w1, region, target);
let m2 = set.establish(w2, region, target);
let m3 = set.establish(w3, region, target);
let m4 = set.establish(w4, region, target);
let watchers = set.watchers_of(target);
assert_eq!(watchers.len(), 4);
let completion_vt = Time::from_nanos(1000);
let mut batch = DownBatch::new();
for (mref, _watcher) in &watchers {
batch.push(
completion_vt,
DownNotification {
monitored: target,
reason: DownReason::Error("crash".into()),
monitor_ref: *mref,
},
);
}
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 4, "each watcher must receive a notification");
for notif in &sorted {
assert_eq!(notif.monitored, target);
assert!(notif.reason.is_error());
}
let mrefs: Vec<MonitorRef> = sorted.iter().map(|n| n.monitor_ref).collect();
assert!(mrefs.contains(&m1));
assert!(mrefs.contains(&m2));
assert!(mrefs.contains(&m3));
assert!(mrefs.contains(&m4));
}
#[test]
fn conformance_simultaneous_downs_deterministic_order() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let watcher = test_task_id(1, 0);
let targets: Vec<TaskId> = (10..15).map(|i| test_task_id(i, 0)).collect();
let mrefs: Vec<MonitorRef> = targets
.iter()
.map(|t| set.establish(watcher, region, *t))
.collect();
let same_vt = Time::from_nanos(500);
let mut batch = DownBatch::new();
for i in (0..5).rev() {
batch.push(
same_vt,
DownNotification {
monitored: targets[i],
reason: DownReason::Error(format!("error_{i}")),
monitor_ref: mrefs[i],
},
);
}
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 5);
for (i, notif) in sorted.iter().enumerate() {
assert_eq!(
notif.monitored,
targets[i],
"notification {i} should be for target tid({})",
10 + i
);
}
for _trial in 0..10 {
let mut batch2 = DownBatch::new();
for i in (0..5).rev() {
batch2.push(
same_vt,
DownNotification {
monitored: targets[i],
reason: DownReason::Error(format!("error_{i}")),
monitor_ref: mrefs[i],
},
);
}
let sorted2 = batch2.into_sorted();
for (i, notif) in sorted2.iter().enumerate() {
assert_eq!(notif.monitored, targets[i]);
}
}
}
#[test]
fn conformance_mixed_vt_deterministic_interleaving() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let watcher = test_task_id(1, 0);
let t_a = test_task_id(5, 0);
let t_b = test_task_id(3, 0);
let t_c = test_task_id(8, 0);
let t_d = test_task_id(2, 0);
let m_a = set.establish(watcher, region, t_a);
let m_b = set.establish(watcher, region, t_b);
let m_c = set.establish(watcher, region, t_c);
let m_d = set.establish(watcher, region, t_d);
let mut batch = DownBatch::new();
batch.push(
Time::from_nanos(200),
DownNotification {
monitored: t_a,
reason: DownReason::Error("a".into()),
monitor_ref: m_a,
},
);
batch.push(
Time::from_nanos(100),
DownNotification {
monitored: t_b,
reason: DownReason::Panicked(PanicPayload::new("b")),
monitor_ref: m_b,
},
);
batch.push(
Time::from_nanos(200),
DownNotification {
monitored: t_c,
reason: DownReason::Normal,
monitor_ref: m_c,
},
);
batch.push(
Time::from_nanos(100),
DownNotification {
monitored: t_d,
reason: DownReason::Cancelled(CancelReason::default()),
monitor_ref: m_d,
},
);
let sorted = batch.into_sorted();
assert_eq!(sorted[0].monitored, t_d); assert_eq!(sorted[1].monitored, t_b); assert_eq!(sorted[2].monitored, t_a); assert_eq!(sorted[3].monitored, t_c); }
#[test]
fn conformance_cancellation_cleanup_cross_region() {
let mut set = MonitorSet::new();
let r_closing = test_region_id(1, 0);
let r_open = test_region_id(2, 0);
let target = test_task_id(100, 0);
let w_closing = test_task_id(1, 0);
let w_open = test_task_id(2, 0);
set.establish(w_closing, r_closing, target);
let m_open = set.establish(w_open, r_open, target);
let removed = set.cleanup_region(r_closing);
assert_eq!(removed.len(), 1);
let watchers = set.watchers_of(target);
assert_eq!(watchers.len(), 1);
assert_eq!(watchers[0].0, m_open);
assert_eq!(watchers[0].1, w_open);
let mut batch = DownBatch::new();
for (mref, _) in &watchers {
batch.push(
Time::from_nanos(500),
DownNotification {
monitored: target,
reason: DownReason::Error("target died".into()),
monitor_ref: *mref,
},
);
}
let sorted = batch.into_sorted();
assert_eq!(
sorted.len(),
1,
"only the open-region watcher gets notified"
);
assert_eq!(sorted[0].monitor_ref, m_open);
}
#[test]
fn conformance_cleanup_index_consistency() {
let mut set = MonitorSet::new();
let r1 = test_region_id(1, 0);
let r2 = test_region_id(2, 0);
let t1 = test_task_id(1, 0);
let t2 = test_task_id(2, 0);
let t3 = test_task_id(3, 0);
let target = test_task_id(100, 0);
set.establish(t1, r1, target);
set.establish(t2, r1, target);
let m3 = set.establish(t3, r2, target);
set.cleanup_region(r1);
assert_eq!(set.len(), 1);
assert_eq!(set.watchers_of(target).len(), 1);
assert_eq!(set.watcher_of(m3), Some(t3));
assert_eq!(set.monitored_of(m3), Some(target));
set.cleanup_region(r2);
assert!(set.is_empty());
assert!(set.watchers_of(target).is_empty());
}
#[test]
fn conformance_monotone_severity_in_down() {
let outcomes = vec![
("Normal", DownReason::Normal),
("Error", DownReason::Error("fail".into())),
("Cancelled", DownReason::Cancelled(CancelReason::default())),
("Panicked", DownReason::Panicked(PanicPayload::new("boom"))),
];
for (name, reason) in outcomes {
let notif = DownNotification {
monitored: test_task_id(1, 0),
reason: reason.clone(),
monitor_ref: MonitorRef::from_raw(1),
};
match name {
"Normal" => assert!(notif.reason.is_normal()),
"Error" => assert!(notif.reason.is_error()),
"Cancelled" => assert!(notif.reason.is_cancelled()),
"Panicked" => assert!(notif.reason.is_panicked()),
_ => unreachable!(),
}
}
}
#[test]
fn conformance_sequential_cleanup_no_leaks() {
let mut set = MonitorSet::new();
let r1 = test_region_id(1, 0);
let r2 = test_region_id(2, 0);
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let t1 = test_task_id(10, 0);
let t2 = test_task_id(20, 0);
set.establish(w1, r1, t1);
set.establish(w1, r1, t2);
set.establish(w2, r2, t1);
assert_eq!(set.len(), 3);
set.remove_monitored(t1);
assert_eq!(set.len(), 1);
set.cleanup_region(r1);
assert!(set.is_empty());
assert!(set.watchers_of(t1).is_empty());
assert!(set.watchers_of(t2).is_empty());
assert_eq!(set.len(), 0);
}
#[test]
fn conformance_demonitor_selective_cancellation() {
let mut set = MonitorSet::new();
let region = test_region_id(0, 0);
let target = test_task_id(100, 0);
let w1 = test_task_id(1, 0);
let w2 = test_task_id(2, 0);
let w3 = test_task_id(3, 0);
let m1 = set.establish(w1, region, target);
let _m2 = set.establish(w2, region, target);
let _m3 = set.establish(w3, region, target);
assert!(set.demonitor(m1));
let watchers = set.watchers_of(target);
assert_eq!(watchers.len(), 2);
let watcher_tids: Vec<TaskId> = watchers.iter().map(|(_, t)| *t).collect();
assert!(
!watcher_tids.contains(&w1),
"demonitored watcher must not appear"
);
assert!(watcher_tids.contains(&w2));
assert!(watcher_tids.contains(&w3));
}
#[test]
fn monitor_ref_debug_clone_copy_eq_hash_ord() {
use std::collections::HashSet;
let r = MonitorRef::from_raw(42);
let dbg = format!("{r:?}");
assert!(dbg.contains("MonitorRef"));
let r2 = r;
assert_eq!(r, r2);
let r3 = r;
assert_eq!(r, r3);
let r4 = MonitorRef::from_raw(100);
assert!(r < r4);
let mut set = HashSet::new();
set.insert(r);
set.insert(r4);
assert_eq!(set.len(), 2);
}
#[test]
fn down_reason_debug_clone_eq() {
let d = DownReason::Normal;
let dbg = format!("{d:?}");
assert!(dbg.contains("Normal"));
let d2 = d.clone();
assert_eq!(d, d2);
let d3 = DownReason::Error("oops".into());
assert_ne!(d, d3);
}
}