use crate::actor::ActorId;
use crate::types::{RegionId, TaskId, Time};
use std::collections::{HashMap, HashSet};
use std::fmt;
#[derive(Debug, Clone)]
pub struct ReplyLinearityViolation {
pub server: ActorId,
pub task: TaskId,
pub dropped: bool,
pub call_time: Time,
}
impl fmt::Display for ReplyLinearityViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = if self.dropped {
"dropped"
} else {
"double-sent"
};
write!(
f,
"Reply {} by server {:?} (task {:?}) at {:?}",
kind, self.server, self.task, self.call_time
)
}
}
impl std::error::Error for ReplyLinearityViolation {}
#[derive(Debug, Default)]
pub struct ReplyLinearityOracle {
pending: HashMap<(ActorId, TaskId), (Time, bool, bool)>,
created_count: usize,
resolved_count: usize,
}
impl ReplyLinearityOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_reply_created(&mut self, server: ActorId, task: TaskId, time: Time) {
self.pending.insert((server, task), (time, false, false));
self.created_count += 1;
}
pub fn on_reply_sent(&mut self, server: ActorId, task: TaskId) {
if let Some(entry) = self.pending.get_mut(&(server, task)) {
if entry.1 {
entry.2 = true;
} else {
entry.1 = true;
}
}
self.resolved_count += 1;
}
pub fn on_reply_aborted(&mut self, server: ActorId, task: TaskId) {
if let Some(entry) = self.pending.get_mut(&(server, task)) {
if entry.1 {
entry.2 = true;
} else {
entry.1 = true;
}
}
self.resolved_count += 1;
}
pub fn check(&self) -> Result<(), ReplyLinearityViolation> {
let mut keys: Vec<_> = self.pending.keys().copied().collect();
keys.sort_by_key(|(a, t)| (a.task_id(), *t));
for (server, task) in keys {
if let Some(&(call_time, resolved, over_resolved)) = self.pending.get(&(server, task)) {
if over_resolved {
return Err(ReplyLinearityViolation {
server,
task,
dropped: false,
call_time,
});
}
if !resolved {
return Err(ReplyLinearityViolation {
server,
task,
dropped: true,
call_time,
});
}
}
}
Ok(())
}
pub fn reset(&mut self) {
self.pending.clear();
self.created_count = 0;
self.resolved_count = 0;
}
#[must_use]
pub fn created_count(&self) -> usize {
self.created_count
}
#[must_use]
pub fn resolved_count(&self) -> usize {
self.resolved_count
}
}
#[derive(Debug, Clone)]
pub struct RegistryLeaseViolation {
pub name: String,
pub holder: TaskId,
pub region: RegionId,
pub acquired_at: Time,
}
impl fmt::Display for RegistryLeaseViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Stale name lease \"{}\" held by {:?} in region {:?} (acquired at {:?})",
self.name, self.holder, self.region, self.acquired_at
)
}
}
impl std::error::Error for RegistryLeaseViolation {}
#[derive(Debug, Default)]
pub struct RegistryLeaseOracle {
leases: HashMap<String, (TaskId, RegionId, Time, bool)>,
acquired_count: usize,
resolved_count: usize,
}
impl RegistryLeaseOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_lease_acquired(
&mut self,
name: impl Into<String>,
holder: TaskId,
region: RegionId,
time: Time,
) {
self.leases
.insert(name.into(), (holder, region, time, false));
self.acquired_count += 1;
}
pub fn on_lease_released(&mut self, name: &str) {
if let Some(entry) = self.leases.get_mut(name) {
entry.3 = true;
}
self.resolved_count += 1;
}
pub fn on_lease_aborted(&mut self, name: &str) {
if let Some(entry) = self.leases.get_mut(name) {
entry.3 = true;
}
self.resolved_count += 1;
}
pub fn check(&self) -> Result<(), RegistryLeaseViolation> {
let mut names: Vec<_> = self.leases.keys().cloned().collect();
names.sort();
for name in names {
if let Some(&(holder, region, acquired_at, resolved)) = self.leases.get(&name) {
if !resolved {
return Err(RegistryLeaseViolation {
name,
holder,
region,
acquired_at,
});
}
}
}
Ok(())
}
pub fn reset(&mut self) {
self.leases.clear();
self.acquired_count = 0;
self.resolved_count = 0;
}
#[must_use]
pub fn acquired_count(&self) -> usize {
self.acquired_count
}
#[must_use]
pub fn resolved_count(&self) -> usize {
self.resolved_count
}
}
#[derive(Debug, Clone)]
pub struct DownOrderViolation {
pub monitor: TaskId,
pub expected: Vec<TaskId>,
pub actual: Vec<TaskId>,
}
impl fmt::Display for DownOrderViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Non-deterministic DOWN order for monitor {:?}: expected {:?}, got {:?}",
self.monitor, self.expected, self.actual
)
}
}
impl std::error::Error for DownOrderViolation {}
#[derive(Debug, Default)]
pub struct DownOrderOracle {
delivery_sequences: HashMap<TaskId, Vec<TaskId>>,
down_count: usize,
}
impl DownOrderOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_down_delivered(&mut self, monitor: TaskId, subject: TaskId) {
self.delivery_sequences
.entry(monitor)
.or_default()
.push(subject);
self.down_count += 1;
}
pub fn check(&self) -> Result<(), DownOrderViolation> {
let mut monitors: Vec<_> = self.delivery_sequences.keys().copied().collect();
monitors.sort();
for monitor in monitors {
if let Some(actual) = self.delivery_sequences.get(&monitor) {
let mut expected = actual.clone();
expected.sort();
if *actual != expected {
return Err(DownOrderViolation {
monitor,
expected,
actual: actual.clone(),
});
}
}
}
Ok(())
}
pub fn reset(&mut self) {
self.delivery_sequences.clear();
self.down_count = 0;
}
#[must_use]
pub fn monitor_count(&self) -> usize {
self.delivery_sequences.len()
}
#[must_use]
pub fn down_count(&self) -> usize {
self.down_count
}
}
#[derive(Debug, Clone)]
pub struct SupervisorQuiescenceViolation {
pub supervisor: TaskId,
pub region: RegionId,
pub active_children: Vec<TaskId>,
pub close_time: Time,
}
impl fmt::Display for SupervisorQuiescenceViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Supervisor {:?} (region {:?}) closed at {:?} with {} active children: {:?}",
self.supervisor,
self.region,
self.close_time,
self.active_children.len(),
self.active_children
)
}
}
impl std::error::Error for SupervisorQuiescenceViolation {}
#[derive(Debug, Default)]
pub struct SupervisorQuiescenceOracle {
supervisors: HashMap<TaskId, (RegionId, HashSet<TaskId>)>,
completed_tasks: HashMap<TaskId, Time>,
closed_regions: HashMap<RegionId, Time>,
child_count: usize,
}
impl SupervisorQuiescenceOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_supervisor_created(&mut self, supervisor: TaskId, region: RegionId) {
self.supervisors
.entry(supervisor)
.or_insert_with(|| (region, HashSet::new()));
}
pub fn on_child_added(&mut self, supervisor: TaskId, child: TaskId) {
if let Some((_, children)) = self.supervisors.get_mut(&supervisor) {
children.insert(child);
}
self.child_count += 1;
}
pub fn on_task_completed(&mut self, task: TaskId, time: Time) {
self.completed_tasks
.entry(task)
.and_modify(|completed_at| {
if time < *completed_at {
*completed_at = time;
}
})
.or_insert(time);
}
pub fn on_region_closed(&mut self, region: RegionId, time: Time) {
self.closed_regions.insert(region, time);
}
pub fn check(&self) -> Result<(), SupervisorQuiescenceViolation> {
let mut sups: Vec<_> = self.supervisors.keys().copied().collect();
sups.sort();
for supervisor in sups {
if let Some((region, children)) = self.supervisors.get(&supervisor) {
if let Some(&close_time) = self.closed_regions.get(region) {
let mut active: Vec<TaskId> = children
.iter()
.copied()
.filter(|c| {
self.completed_tasks
.get(c)
.is_none_or(|completed_at| *completed_at > close_time)
})
.collect();
active.sort();
if !active.is_empty() {
return Err(SupervisorQuiescenceViolation {
supervisor,
region: *region,
active_children: active,
close_time,
});
}
}
}
}
Ok(())
}
pub fn reset(&mut self) {
self.supervisors.clear();
self.completed_tasks.clear();
self.closed_regions.clear();
self.child_count = 0;
}
#[must_use]
pub fn supervisor_count(&self) -> usize {
self.supervisors.len()
}
#[must_use]
pub fn child_count(&self) -> usize {
self.child_count
}
#[must_use]
pub fn closed_region_count(&self) -> usize {
self.closed_regions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::TaskId;
fn task(index: u32) -> TaskId {
TaskId::new_for_test(index, 0)
}
fn actor(index: u32) -> ActorId {
ActorId::from_task(task(index))
}
fn region(index: u32) -> RegionId {
RegionId::new_for_test(index, 0)
}
#[test]
fn reply_linearity_pass_when_all_resolved() {
let mut oracle = ReplyLinearityOracle::new();
oracle.on_reply_created(actor(1), task(1), Time::ZERO);
oracle.on_reply_sent(actor(1), task(1));
assert!(oracle.check().is_ok());
}
#[test]
fn reply_linearity_fail_on_dropped_reply() {
let mut oracle = ReplyLinearityOracle::new();
oracle.on_reply_created(actor(1), task(1), Time::ZERO);
let err = oracle.check().unwrap_err();
assert!(err.dropped);
}
#[test]
fn reply_linearity_pass_on_aborted_reply() {
let mut oracle = ReplyLinearityOracle::new();
oracle.on_reply_created(actor(1), task(1), Time::ZERO);
oracle.on_reply_aborted(actor(1), task(1));
assert!(oracle.check().is_ok());
}
#[test]
fn reply_linearity_fail_on_double_resolution() {
let mut oracle = ReplyLinearityOracle::new();
oracle.on_reply_created(actor(1), task(1), Time::ZERO);
oracle.on_reply_sent(actor(1), task(1));
oracle.on_reply_aborted(actor(1), task(1));
let err = oracle.check().unwrap_err();
assert!(!err.dropped);
}
#[test]
fn reply_linearity_reset_clears_state() {
let mut oracle = ReplyLinearityOracle::new();
oracle.on_reply_created(actor(1), task(1), Time::ZERO);
oracle.reset();
assert!(oracle.check().is_ok());
assert_eq!(oracle.created_count(), 0);
}
#[test]
fn registry_lease_pass_when_all_resolved() {
let mut oracle = RegistryLeaseOracle::new();
oracle.on_lease_acquired("svc", task(1), region(0), Time::ZERO);
oracle.on_lease_released("svc");
assert!(oracle.check().is_ok());
}
#[test]
fn registry_lease_fail_on_unreleased_lease() {
let mut oracle = RegistryLeaseOracle::new();
oracle.on_lease_acquired("leaked_svc", task(1), region(0), Time::ZERO);
let err = oracle.check().unwrap_err();
assert_eq!(err.name, "leaked_svc");
}
#[test]
fn registry_lease_pass_on_aborted_lease() {
let mut oracle = RegistryLeaseOracle::new();
oracle.on_lease_acquired("temp", task(1), region(0), Time::ZERO);
oracle.on_lease_aborted("temp");
assert!(oracle.check().is_ok());
}
#[test]
fn registry_lease_reset_clears_state() {
let mut oracle = RegistryLeaseOracle::new();
oracle.on_lease_acquired("name", task(1), region(0), Time::ZERO);
oracle.reset();
assert!(oracle.check().is_ok());
assert_eq!(oracle.acquired_count(), 0);
}
#[test]
fn down_order_pass_when_sorted() {
let mut oracle = DownOrderOracle::new();
oracle.on_down_delivered(task(10), task(1));
oracle.on_down_delivered(task(10), task(2));
oracle.on_down_delivered(task(10), task(3));
assert!(oracle.check().is_ok());
}
#[test]
fn down_order_fail_when_unsorted() {
let mut oracle = DownOrderOracle::new();
oracle.on_down_delivered(task(10), task(3));
oracle.on_down_delivered(task(10), task(1));
let err = oracle.check().unwrap_err();
assert_eq!(err.monitor, task(10));
}
#[test]
fn down_order_pass_with_empty() {
let oracle = DownOrderOracle::new();
assert!(oracle.check().is_ok());
}
#[test]
fn down_order_reset_clears_state() {
let mut oracle = DownOrderOracle::new();
oracle.on_down_delivered(task(10), task(3));
oracle.on_down_delivered(task(10), task(1));
oracle.reset();
assert!(oracle.check().is_ok());
assert_eq!(oracle.down_count(), 0);
}
#[test]
fn supervisor_quiescence_pass_when_all_completed() {
let mut oracle = SupervisorQuiescenceOracle::new();
oracle.on_supervisor_created(task(1), region(0));
oracle.on_child_added(task(1), task(2));
oracle.on_child_added(task(1), task(3));
oracle.on_task_completed(task(2), Time::ZERO);
oracle.on_task_completed(task(3), Time::ZERO);
oracle.on_region_closed(region(0), Time::ZERO);
assert!(oracle.check().is_ok());
}
#[test]
fn supervisor_quiescence_fail_with_active_child() {
let mut oracle = SupervisorQuiescenceOracle::new();
oracle.on_supervisor_created(task(1), region(0));
oracle.on_child_added(task(1), task(2));
oracle.on_region_closed(region(0), Time::ZERO);
let err = oracle.check().unwrap_err();
assert_eq!(err.active_children, vec![task(2)]);
}
#[test]
fn supervisor_quiescence_pass_when_region_not_closed() {
let mut oracle = SupervisorQuiescenceOracle::new();
oracle.on_supervisor_created(task(1), region(0));
oracle.on_child_added(task(1), task(2));
assert!(oracle.check().is_ok());
}
#[test]
fn supervisor_quiescence_fail_when_child_completes_after_close() {
let mut oracle = SupervisorQuiescenceOracle::new();
oracle.on_supervisor_created(task(1), region(0));
oracle.on_child_added(task(1), task(2));
oracle.on_region_closed(region(0), Time::from_nanos(100));
oracle.on_task_completed(task(2), Time::from_nanos(101));
let err = oracle.check().unwrap_err();
assert_eq!(err.active_children, vec![task(2)]);
assert_eq!(err.close_time, Time::from_nanos(100));
}
#[test]
fn supervisor_quiescence_reset_clears_state() {
let mut oracle = SupervisorQuiescenceOracle::new();
oracle.on_supervisor_created(task(1), region(0));
oracle.on_child_added(task(1), task(2));
oracle.on_region_closed(region(0), Time::ZERO);
oracle.reset();
assert!(oracle.check().is_ok());
assert_eq!(oracle.supervisor_count(), 0);
}
}