pub mod actor;
pub mod ambient_authority;
pub mod cancel_correctness;
pub mod cancel_debt;
pub mod cancel_signal_ordering;
pub mod cancellation_protocol;
pub mod channel_atomicity;
pub mod deadline_monotone;
pub mod determinism;
pub mod eprocess;
pub mod evidence;
#[cfg(feature = "messaging-fabric")]
pub mod fabric;
pub mod finalizer;
pub mod loser_drain;
pub mod obligation_leak;
pub mod priority_inversion;
pub mod quiescence;
pub mod region_leak;
pub mod region_tree;
pub mod rref_access;
pub mod runtime_epoch;
pub mod spork;
pub mod task_leak;
pub mod waker_dedup;
pub use actor::{
ActorLeakOracle, ActorLeakViolation, MailboxOracle, MailboxViolation, MailboxViolationKind,
SupervisionOracle, SupervisionViolation, SupervisionViolationKind,
};
pub use ambient_authority::{
AmbientAuthorityOracle, AmbientAuthorityViolation, CapabilityKind, CapabilitySet,
};
pub use cancel_correctness::{
CancelCorrectnessConfig, CancelCorrectnessOracle, CancelCorrectnessStatistics,
CancelCorrectnessViolation, InvalidInitialWitnessKind,
};
pub use cancel_debt::{
CancelDebtConfig, CancelDebtOracle, CancelDebtStatistics, CancelDebtViolation,
};
pub use cancel_signal_ordering::{
CancelOrderingConfig, CancelOrderingOracle, CancelOrderingStatistics, CancelOrderingViolation,
};
pub use cancellation_protocol::{
CancellationProtocolOracle, CancellationProtocolViolation, TaskStateKind,
};
pub use channel_atomicity::{
ChannelAtomicityConfig, ChannelAtomicityOracle, ChannelAtomicityStatistics,
ChannelAtomicityViolation, ChannelId, EnforcementMode, ReservationId, ViolationRecord,
WakerId as ChannelWakerId,
};
pub use deadline_monotone::{DeadlineMonotoneOracle, DeadlineMonotoneViolation};
pub use determinism::{
DeterminismOracle, DeterminismViolation, TraceEventSummary, assert_deterministic,
assert_deterministic_multi,
};
pub use eprocess::{EProcess, EProcessConfig, EProcessMonitor, EValue, MonitorResult};
pub use evidence::{
BayesFactor, DetectionModel, EvidenceEntry, EvidenceLedger, EvidenceLine, EvidenceStrength,
EvidenceSummary, LogLikelihoodContributions,
};
#[cfg(feature = "messaging-fabric")]
pub use fabric::{
FabricPublishOracle, FabricPublishViolation, FabricQuiescenceOracle, FabricQuiescenceViolation,
FabricRedeliveryOracle, FabricRedeliveryViolation, FabricReplyOracle, FabricReplyViolation,
};
pub use finalizer::{FinalizerId, FinalizerOracle, FinalizerViolation};
pub use loser_drain::{LoserDrainOracle, LoserDrainViolation};
pub use obligation_leak::{ObligationLeakOracle, ObligationLeakViolation};
pub use priority_inversion::{
InversionId, InversionType, Priority, PriorityInversion, PriorityInversionConfig,
PriorityInversionOracle, PriorityInversionStatistics, ResourceId,
};
pub use quiescence::{QuiescenceOracle, QuiescenceViolation};
pub use region_leak::{
BudgetInfo, RegionLeakConfig, RegionLeakOracle, RegionLeakStatistics, RegionLifecycleState,
RegionState as RegionLeakState, RegionViolation, TaskLifecycleState, TaskState,
ViolationContext, ViolationType,
};
pub use region_tree::{RegionTreeEntry, RegionTreeOracle, RegionTreeViolation};
pub use rref_access::{RRefAccessOracle, RRefAccessViolation, RRefAccessViolationKind, RRefId};
pub use runtime_epoch::{
ConsistencyLevel, RuntimeEpochConfig, RuntimeEpochOracle, RuntimeEpochStatistics,
RuntimeEpochViolation, RuntimeModule,
};
pub use spork::{
DownOrderOracle, DownOrderViolation, RegistryLeaseOracle, RegistryLeaseViolation,
ReplyLinearityOracle, ReplyLinearityViolation, SupervisorQuiescenceOracle,
SupervisorQuiescenceViolation,
};
pub use task_leak::{TaskLeakOracle, TaskLeakViolation};
pub use waker_dedup::{
ViolationRecord as WakerViolationRecord, WakerDedupConfig, WakerDedupOracle,
WakerDedupStatistics, WakerDedupViolation, WakerId as WakerDedupId, WakerStatus,
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write as _;
use crate::obligation::dialectica::ContractChecker;
use crate::obligation::marking::{MarkingAnalyzer, MarkingEvent};
use crate::obligation::no_aliasing_proof::NoAliasingProver;
use crate::record::region::RegionState;
use crate::runtime::RuntimeState;
use crate::types::Time;
#[derive(Debug, Clone)]
pub enum OracleViolation {
TaskLeak(TaskLeakViolation),
ObligationLeak(ObligationLeakViolation),
Quiescence(QuiescenceViolation),
LoserDrain(LoserDrainViolation),
Finalizer(FinalizerViolation),
RegionTree(RegionTreeViolation),
RegionLeak(RegionViolation),
AmbientAuthority(AmbientAuthorityViolation),
DeadlineMonotone(DeadlineMonotoneViolation),
CancellationProtocol(CancellationProtocolViolation),
CancelCorrectness(CancelCorrectnessViolation),
CancelDebt(CancelDebtViolation),
CancelOrdering(CancelOrderingViolation),
RuntimeEpoch(RuntimeEpochViolation),
ChannelAtomicity(ChannelAtomicityViolation),
WakerDedup(WakerDedupViolation),
ActorLeak(ActorLeakViolation),
Supervision(SupervisionViolation),
Mailbox(MailboxViolation),
RRefAccess(RRefAccessViolation),
ReplyLinearity(ReplyLinearityViolation),
RegistryLease(RegistryLeaseViolation),
DownOrder(DownOrderViolation),
SupervisorQuiescence(SupervisorQuiescenceViolation),
#[cfg(feature = "messaging-fabric")]
FabricPublish(FabricPublishViolation),
#[cfg(feature = "messaging-fabric")]
FabricReply(FabricReplyViolation),
#[cfg(feature = "messaging-fabric")]
FabricQuiescence(FabricQuiescenceViolation),
#[cfg(feature = "messaging-fabric")]
FabricRedelivery(FabricRedeliveryViolation),
PriorityInversion(PriorityInversion),
}
impl std::fmt::Display for OracleViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TaskLeak(v) => write!(f, "Task leak: {v}"),
Self::ObligationLeak(v) => write!(f, "Obligation leak: {v}"),
Self::Quiescence(v) => write!(f, "Quiescence violation: {v}"),
Self::LoserDrain(v) => write!(f, "Loser drain violation: {v}"),
Self::Finalizer(v) => write!(f, "Finalizer violation: {v}"),
Self::RegionTree(v) => write!(f, "Region tree violation: {v}"),
Self::RegionLeak(v) => write!(f, "Region leak violation: {v}"),
Self::AmbientAuthority(v) => write!(f, "Ambient authority violation: {v}"),
Self::DeadlineMonotone(v) => write!(f, "Deadline monotonicity violation: {v}"),
Self::CancellationProtocol(v) => write!(f, "Cancellation protocol violation: {v}"),
Self::CancelCorrectness(v) => write!(f, "Cancel-correctness violation: {v}"),
Self::CancelDebt(v) => write!(f, "Cancel debt violation: {v}"),
Self::CancelOrdering(v) => write!(f, "Cancel ordering violation: {v}"),
Self::RuntimeEpoch(v) => write!(f, "Runtime epoch violation: {v}"),
Self::ChannelAtomicity(v) => write!(f, "Channel atomicity violation: {v}"),
Self::WakerDedup(v) => write!(f, "Waker deduplication violation: {v}"),
Self::ActorLeak(v) => write!(f, "Actor leak: {v}"),
Self::Supervision(v) => write!(f, "Supervision violation: {v}"),
Self::Mailbox(v) => write!(f, "Mailbox violation: {v}"),
Self::RRefAccess(v) => write!(f, "RRef access violation: {v}"),
Self::ReplyLinearity(v) => write!(f, "Reply linearity violation: {v}"),
Self::RegistryLease(v) => write!(f, "Registry lease violation: {v}"),
Self::DownOrder(v) => write!(f, "DOWN order violation: {v}"),
Self::SupervisorQuiescence(v) => write!(f, "Supervisor quiescence violation: {v}"),
#[cfg(feature = "messaging-fabric")]
Self::FabricPublish(v) => write!(f, "FABRIC publish violation: {v}"),
#[cfg(feature = "messaging-fabric")]
Self::FabricReply(v) => write!(f, "FABRIC reply violation: {v}"),
#[cfg(feature = "messaging-fabric")]
Self::FabricQuiescence(v) => write!(f, "FABRIC quiescence violation: {v}"),
#[cfg(feature = "messaging-fabric")]
Self::FabricRedelivery(v) => write!(f, "FABRIC redelivery violation: {v}"),
Self::PriorityInversion(v) => write!(
f,
"Priority inversion: Task {:?}(P{:?}) blocked by Task {:?}(P{:?}) on Resource {:?} for {:?}",
v.blocked_task,
v.blocked_priority,
v.blocking_task,
v.blocking_priority,
v.resource_id,
v.duration.unwrap_or_else(|| v.start_time.elapsed())
),
}
}
}
impl std::error::Error for OracleViolation {}
#[derive(Debug, Default)]
pub struct OracleSuite {
pub task_leak: TaskLeakOracle,
pub obligation_leak: ObligationLeakOracle,
pub quiescence: QuiescenceOracle,
pub loser_drain: LoserDrainOracle,
pub finalizer: FinalizerOracle,
pub region_tree: RegionTreeOracle,
pub region_leak: RegionLeakOracle,
pub ambient_authority: AmbientAuthorityOracle,
pub deadline_monotone: DeadlineMonotoneOracle,
pub cancellation_protocol: CancellationProtocolOracle,
pub cancel_correctness: CancelCorrectnessOracle,
pub cancel_debt: CancelDebtOracle,
pub cancel_signal_ordering: CancelOrderingOracle,
pub runtime_epoch: RuntimeEpochOracle,
pub channel_atomicity: ChannelAtomicityOracle,
pub waker_dedup: WakerDedupOracle,
pub actor_leak: ActorLeakOracle,
pub supervision: SupervisionOracle,
pub mailbox: MailboxOracle,
pub rref_access: RRefAccessOracle,
pub reply_linearity: ReplyLinearityOracle,
pub registry_lease: RegistryLeaseOracle,
pub down_order: DownOrderOracle,
pub supervisor_quiescence: SupervisorQuiescenceOracle,
#[cfg(feature = "messaging-fabric")]
pub fabric_publish: FabricPublishOracle,
#[cfg(feature = "messaging-fabric")]
pub fabric_reply: FabricReplyOracle,
#[cfg(feature = "messaging-fabric")]
pub fabric_quiescence: FabricQuiescenceOracle,
#[cfg(feature = "messaging-fabric")]
pub fabric_redelivery: FabricRedeliveryOracle,
pub eprocess_monitor: Option<EProcessMonitor>,
}
impl OracleSuite {
#[must_use]
pub fn new() -> Self {
Self {
eprocess_monitor: Some(EProcessMonitor::standard()),
..Self::default()
}
}
#[allow(clippy::too_many_lines)]
pub fn hydrate_temporal_from_state(&mut self, state: &RuntimeState, now: Time) {
#[derive(Clone, Copy)]
struct RegionSnapshot {
id: crate::types::RegionId,
parent: Option<crate::types::RegionId>,
state: RegionState,
budget: crate::types::Budget,
created_at: Time,
}
fn walk_regions(
id: crate::types::RegionId,
children: &BTreeMap<crate::types::RegionId, Vec<crate::types::RegionId>>,
seen: &mut BTreeSet<crate::types::RegionId>,
pre_order: &mut Vec<crate::types::RegionId>,
post_order: &mut Vec<crate::types::RegionId>,
) {
if !seen.insert(id) {
return;
}
pre_order.push(id);
if let Some(kids) = children.get(&id) {
for &child in kids {
walk_regions(child, children, seen, pre_order, post_order);
}
}
post_order.push(id);
}
self.task_leak.reset();
self.obligation_leak.snapshot_from_state(state, now);
self.quiescence.reset();
self.finalizer.reset();
self.region_tree.reset();
self.deadline_monotone.reset();
self.cancellation_protocol.snapshot_from_state(state, now);
self.cancel_correctness.reset();
self.cancel_debt.reset();
self.cancel_signal_ordering.reset();
self.runtime_epoch.reset();
for event in state.finalizer_history() {
match *event {
crate::runtime::state::FinalizerHistoryEvent::Registered { id, region, time } => {
self.finalizer.on_register(FinalizerId(id), region, time);
}
crate::runtime::state::FinalizerHistoryEvent::Ran { id, time } => {
self.finalizer.on_run(FinalizerId(id), time);
}
crate::runtime::state::FinalizerHistoryEvent::RegionClosed { region, time } => {
self.finalizer.on_region_close(region, time);
}
}
}
let mut regions: BTreeMap<crate::types::RegionId, RegionSnapshot> = BTreeMap::new();
let mut children: BTreeMap<crate::types::RegionId, Vec<crate::types::RegionId>> =
BTreeMap::new();
for (_, region) in state.regions_iter() {
let snapshot = RegionSnapshot {
id: region.id,
parent: region.parent,
state: region.state(),
budget: region.budget(),
created_at: region.created_at(),
};
regions.insert(snapshot.id, snapshot);
children.entry(snapshot.id).or_default();
}
for snapshot in regions.values() {
if let Some(parent) = snapshot.parent {
children.entry(parent).or_default().push(snapshot.id);
}
}
for kids in children.values_mut() {
kids.sort();
}
let mut roots = Vec::new();
for (id, snapshot) in ®ions {
if snapshot
.parent
.is_none_or(|parent| !regions.contains_key(&parent))
{
roots.push(*id);
}
}
let mut pre_order = Vec::new();
let mut post_order = Vec::new();
let mut seen = BTreeSet::new();
for root in roots {
walk_regions(root, &children, &mut seen, &mut pre_order, &mut post_order);
}
for &id in regions.keys() {
walk_regions(id, &children, &mut seen, &mut pre_order, &mut post_order);
}
for region_id in &pre_order {
let Some(snapshot) = regions.get(region_id) else {
continue;
};
self.region_tree
.on_region_create(snapshot.id, snapshot.parent, snapshot.created_at);
self.quiescence
.on_region_create(snapshot.id, snapshot.parent);
self.deadline_monotone.on_region_create(
snapshot.id,
snapshot.parent,
&snapshot.budget,
now,
);
}
let mut tasks = Vec::new();
for (_, task) in state.tasks_iter() {
tasks.push((task.id, task.owner, task.state.is_terminal()));
}
tasks.sort_by_key(|(task, _, _)| *task);
for (task_id, region_id, terminal) in tasks {
self.task_leak.on_spawn(task_id, region_id, now);
self.quiescence.on_spawn(task_id, region_id);
if terminal {
self.task_leak.on_complete(task_id, now);
self.quiescence.on_task_complete(task_id);
}
}
for region_id in post_order {
let Some(snapshot) = regions.get(®ion_id) else {
continue;
};
if snapshot.state.is_terminal() {
self.task_leak.on_region_close(region_id, now);
self.quiescence.on_region_close(region_id, now);
}
}
}
#[must_use]
pub fn check_all(&mut self, now: Time) -> Vec<OracleViolation> {
let mut violations = Vec::new();
if let Err(v) = self.task_leak.check(now) {
violations.push(OracleViolation::TaskLeak(v));
}
if let Err(v) = self.obligation_leak.check(now) {
violations.push(OracleViolation::ObligationLeak(v));
}
if let Err(v) = self.quiescence.check() {
violations.push(OracleViolation::Quiescence(v));
}
if let Err(v) = self.loser_drain.check() {
violations.push(OracleViolation::LoserDrain(v));
}
if let Err(v) = self.finalizer.check() {
violations.push(OracleViolation::Finalizer(v));
}
if let Err(v) = self.region_tree.check() {
violations.push(OracleViolation::RegionTree(v));
}
let region_leak_violations = self.region_leak.check_for_violations();
if let Ok(violations_vec) = region_leak_violations {
for violation in violations_vec {
violations.push(OracleViolation::RegionLeak(violation));
}
} else {
}
if let Err(v) = self.ambient_authority.check() {
violations.push(OracleViolation::AmbientAuthority(v));
}
if let Err(v) = self.deadline_monotone.check() {
violations.push(OracleViolation::DeadlineMonotone(v));
}
if let Err(v) = self.cancellation_protocol.check() {
violations.push(OracleViolation::CancellationProtocol(v));
}
if let Err(v) = self.cancel_correctness.check(now) {
violations.push(OracleViolation::CancelCorrectness(v));
}
if let Err(v) = self.cancel_debt.check(now) {
violations.push(OracleViolation::CancelDebt(v));
}
if let Err(v) = self.cancel_signal_ordering.check(now) {
violations.push(OracleViolation::CancelOrdering(v));
}
if let Err(v) = self.runtime_epoch.check(now) {
violations.push(OracleViolation::RuntimeEpoch(v));
}
let channel_atomicity_violations = self.channel_atomicity.check_for_violations();
if let Ok(violations_vec) = channel_atomicity_violations {
for violation in violations_vec {
violations.push(OracleViolation::ChannelAtomicity(violation));
}
} else {
}
let waker_dedup_violations = self.waker_dedup.check_for_violations();
if let Ok(violations_vec) = waker_dedup_violations {
for violation in violations_vec {
violations.push(OracleViolation::WakerDedup(violation));
}
} else {
}
if let Err(v) = self.actor_leak.check(now) {
violations.push(OracleViolation::ActorLeak(v));
}
if let Err(v) = self.supervision.check(now) {
violations.push(OracleViolation::Supervision(v));
}
if let Err(v) = self.mailbox.check(now) {
violations.push(OracleViolation::Mailbox(v));
}
if let Err(v) = self.rref_access.check() {
violations.push(OracleViolation::RRefAccess(v));
}
if let Err(v) = self.reply_linearity.check() {
violations.push(OracleViolation::ReplyLinearity(v));
}
if let Err(v) = self.registry_lease.check() {
violations.push(OracleViolation::RegistryLease(v));
}
if let Err(v) = self.down_order.check() {
violations.push(OracleViolation::DownOrder(v));
}
if let Err(v) = self.supervisor_quiescence.check() {
violations.push(OracleViolation::SupervisorQuiescence(v));
}
#[cfg(feature = "messaging-fabric")]
if let Err(v) = self.fabric_publish.check() {
violations.push(OracleViolation::FabricPublish(v));
}
#[cfg(feature = "messaging-fabric")]
if let Err(v) = self.fabric_reply.check() {
violations.push(OracleViolation::FabricReply(v));
}
#[cfg(feature = "messaging-fabric")]
if let Err(v) = self.fabric_quiescence.check() {
violations.push(OracleViolation::FabricQuiescence(v));
}
#[cfg(feature = "messaging-fabric")]
if let Err(v) = self.fabric_redelivery.check() {
violations.push(OracleViolation::FabricRedelivery(v));
}
violations
}
pub fn reset(&mut self) {
self.task_leak.reset();
self.obligation_leak.reset();
self.quiescence.reset();
self.loser_drain.reset();
self.finalizer.reset();
self.region_tree.reset();
self.region_leak.reset();
self.ambient_authority.reset();
self.deadline_monotone.reset();
self.cancellation_protocol.reset();
self.cancel_correctness.reset();
self.cancel_debt.reset();
self.cancel_signal_ordering.reset();
self.runtime_epoch.reset();
self.channel_atomicity.reset();
self.waker_dedup.reset();
self.actor_leak.reset();
self.supervision.reset();
self.mailbox.reset();
self.rref_access.reset();
self.reply_linearity.reset();
self.registry_lease.reset();
self.down_order.reset();
self.supervisor_quiescence.reset();
#[cfg(feature = "messaging-fabric")]
self.fabric_publish.reset();
#[cfg(feature = "messaging-fabric")]
self.fabric_reply.reset();
#[cfg(feature = "messaging-fabric")]
self.fabric_quiescence.reset();
#[cfg(feature = "messaging-fabric")]
self.fabric_redelivery.reset();
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn report(&mut self, now: Time) -> OracleReport {
let entries = vec![
OracleEntryReport::from_result(
"task_leak",
self.task_leak
.check(now)
.err()
.map(OracleViolation::TaskLeak),
OracleStats {
entities_tracked: self.task_leak.task_count(),
events_recorded: self.task_leak.task_count()
+ self.task_leak.completed_count()
+ self.task_leak.closed_region_count(),
},
),
OracleEntryReport::from_result(
"obligation_leak",
self.obligation_leak
.check(now)
.err()
.map(OracleViolation::ObligationLeak),
OracleStats {
entities_tracked: self.obligation_leak.obligation_count(),
events_recorded: self.obligation_leak.obligation_count()
+ self.obligation_leak.closed_region_count(),
},
),
OracleEntryReport::from_result(
"quiescence",
self.quiescence
.check()
.err()
.map(OracleViolation::Quiescence),
OracleStats {
entities_tracked: self.quiescence.region_count(),
events_recorded: self.quiescence.region_count()
+ self.quiescence.closed_count(),
},
),
OracleEntryReport::from_result(
"loser_drain",
self.loser_drain
.check()
.err()
.map(OracleViolation::LoserDrain),
OracleStats {
entities_tracked: self.loser_drain.race_count(),
events_recorded: self.loser_drain.race_count()
+ self.loser_drain.completed_race_count(),
},
),
OracleEntryReport::from_result(
"finalizer",
self.finalizer.check().err().map(OracleViolation::Finalizer),
OracleStats {
entities_tracked: self.finalizer.registered_count(),
events_recorded: self.finalizer.registered_count()
+ self.finalizer.ran_count()
+ self.finalizer.closed_region_count(),
},
),
OracleEntryReport::from_result(
"region_tree",
self.region_tree
.check()
.err()
.map(OracleViolation::RegionTree),
OracleStats {
entities_tracked: self.region_tree.region_count(),
events_recorded: self.region_tree.region_count(),
},
),
OracleEntryReport::from_result(
"region_leak",
self.region_leak
.check_for_violations()
.ok()
.and_then(|violations| violations.first().cloned())
.map(OracleViolation::RegionLeak),
OracleStats {
entities_tracked: self.region_leak.statistics().active_regions as usize,
events_recorded: (self.region_leak.statistics().total_regions_created
+ self.region_leak.statistics().total_tasks_spawned)
as usize,
},
),
OracleEntryReport::from_result(
"ambient_authority",
self.ambient_authority
.check()
.err()
.map(OracleViolation::AmbientAuthority),
OracleStats {
entities_tracked: self.ambient_authority.task_count(),
events_recorded: self.ambient_authority.task_count()
+ self.ambient_authority.effect_count(),
},
),
OracleEntryReport::from_result(
"deadline_monotone",
self.deadline_monotone
.check()
.err()
.map(OracleViolation::DeadlineMonotone),
OracleStats {
entities_tracked: self.deadline_monotone.region_count(),
events_recorded: self.deadline_monotone.region_count(),
},
),
OracleEntryReport::from_result(
"cancellation_protocol",
self.cancellation_protocol
.check()
.err()
.map(OracleViolation::CancellationProtocol),
OracleStats {
entities_tracked: self.cancellation_protocol.region_count(),
events_recorded: self.cancellation_protocol.region_count()
+ self.cancellation_protocol.cancel_count(),
},
),
OracleEntryReport::from_result(
"cancel_correctness",
self.cancel_correctness
.check(now)
.err()
.map(OracleViolation::CancelCorrectness),
OracleStats {
entities_tracked: self.cancel_correctness.get_statistics().active_tasks,
events_recorded: self.cancel_correctness.get_statistics().witnesses_processed
as usize,
},
),
OracleEntryReport::from_result(
"cancel_debt",
self.cancel_debt
.check(now)
.err()
.map(OracleViolation::CancelDebt),
OracleStats {
entities_tracked: self.cancel_debt.get_statistics().tracked_queues,
events_recorded: self.cancel_debt.get_statistics().work_items_tracked as usize,
},
),
OracleEntryReport::from_result(
"cancel_signal_ordering",
self.cancel_signal_ordering
.check(now)
.err()
.map(OracleViolation::CancelOrdering),
OracleStats {
entities_tracked: self.cancel_signal_ordering.get_statistics().tracked_signals,
events_recorded: self
.cancel_signal_ordering
.get_statistics()
.signals_processed as usize,
},
),
OracleEntryReport::from_result(
"runtime_epoch",
self.runtime_epoch
.check(now)
.err()
.map(OracleViolation::RuntimeEpoch),
OracleStats {
entities_tracked: self.runtime_epoch.get_statistics().tracked_modules,
events_recorded: self.runtime_epoch.get_statistics().transitions_tracked
as usize,
},
),
OracleEntryReport::from_result(
"channel_atomicity",
self.channel_atomicity
.check_for_violations()
.ok()
.and_then(|violations| violations.first().cloned())
.map(OracleViolation::ChannelAtomicity),
OracleStats {
entities_tracked: (self
.channel_atomicity
.statistics()
.total_reservations_created
+ self.channel_atomicity.statistics().total_wakers_registered)
as usize,
events_recorded: (self
.channel_atomicity
.statistics()
.total_reservations_created
+ self
.channel_atomicity
.statistics()
.total_reservations_committed
+ self
.channel_atomicity
.statistics()
.total_reservations_aborted
+ self.channel_atomicity.statistics().total_wakers_registered
+ self.channel_atomicity.statistics().total_wakeups_expected
+ self.channel_atomicity.statistics().total_wakeups_actual)
as usize,
},
),
OracleEntryReport::from_result(
"waker_dedup",
self.waker_dedup
.check_for_violations()
.ok()
.and_then(|violations| violations.first().cloned())
.map(OracleViolation::WakerDedup),
OracleStats {
entities_tracked: self.waker_dedup.statistics().active_wakers as usize,
events_recorded: (self.waker_dedup.statistics().total_wakers_registered
+ self.waker_dedup.statistics().total_wakers_woken
+ self.waker_dedup.statistics().total_wakers_dropped)
as usize,
},
),
OracleEntryReport::from_result(
"actor_leak",
self.actor_leak
.check(now)
.err()
.map(OracleViolation::ActorLeak),
OracleStats {
entities_tracked: self.actor_leak.actor_count(),
events_recorded: self.actor_leak.actor_count()
+ self.actor_leak.stopped_count()
+ self.actor_leak.closed_region_count(),
},
),
OracleEntryReport::from_result(
"supervision",
self.supervision
.check(now)
.err()
.map(OracleViolation::Supervision),
OracleStats {
entities_tracked: self.supervision.failure_count()
+ self.supervision.restart_count(),
events_recorded: self.supervision.failure_count()
+ self.supervision.restart_count()
+ self.supervision.escalation_count(),
},
),
OracleEntryReport::from_result(
"mailbox",
self.mailbox.check(now).err().map(OracleViolation::Mailbox),
OracleStats {
entities_tracked: self.mailbox.mailbox_count(),
events_recorded: self.mailbox.mailbox_count(),
},
),
OracleEntryReport::from_result(
"rref_access",
self.rref_access
.check()
.err()
.map(OracleViolation::RRefAccess),
OracleStats {
entities_tracked: self.rref_access.rref_count(),
events_recorded: self.rref_access.rref_count()
+ self.rref_access.task_count()
+ self.rref_access.closed_region_count(),
},
),
OracleEntryReport::from_result(
"reply_linearity",
self.reply_linearity
.check()
.err()
.map(OracleViolation::ReplyLinearity),
OracleStats {
entities_tracked: self.reply_linearity.created_count(),
events_recorded: self.reply_linearity.created_count()
+ self.reply_linearity.resolved_count(),
},
),
OracleEntryReport::from_result(
"registry_lease",
self.registry_lease
.check()
.err()
.map(OracleViolation::RegistryLease),
OracleStats {
entities_tracked: self.registry_lease.acquired_count(),
events_recorded: self.registry_lease.acquired_count()
+ self.registry_lease.resolved_count(),
},
),
OracleEntryReport::from_result(
"down_order",
self.down_order
.check()
.err()
.map(OracleViolation::DownOrder),
OracleStats {
entities_tracked: self.down_order.monitor_count(),
events_recorded: self.down_order.down_count(),
},
),
OracleEntryReport::from_result(
"supervisor_quiescence",
self.supervisor_quiescence
.check()
.err()
.map(OracleViolation::SupervisorQuiescence),
OracleStats {
entities_tracked: self.supervisor_quiescence.supervisor_count(),
events_recorded: self.supervisor_quiescence.child_count()
+ self.supervisor_quiescence.closed_region_count(),
},
),
#[cfg(feature = "messaging-fabric")]
self.fabric_publish.report_entry(),
#[cfg(feature = "messaging-fabric")]
self.fabric_reply.report_entry(),
#[cfg(feature = "messaging-fabric")]
self.fabric_quiescence.report_entry(),
#[cfg(feature = "messaging-fabric")]
self.fabric_redelivery.report_entry(),
];
let total = entries.len();
let passed = entries.iter().filter(|e| e.passed).count();
let failed = total - passed;
OracleReport {
entries,
total,
passed,
failed,
check_time_nanos: now.as_nanos(),
}
}
#[must_use]
pub fn report_and_observe(&mut self, now: Time) -> OracleReport {
let report = self.report(now);
if let Some(ref mut monitor) = self.eprocess_monitor {
monitor.observe_report(&report);
}
report
}
#[must_use]
pub fn eprocess_rejected_invariants(&self) -> Vec<String> {
self.eprocess_monitor.as_ref().map_or_else(Vec::new, |m| {
m.rejected_invariants()
.into_iter()
.map(String::from)
.collect()
})
}
#[must_use]
pub fn check_obligation_theory(
&self,
marking_events: &[MarkingEvent],
) -> Vec<ObligationTheoryViolation> {
let mut violations = Vec::new();
let mut marking_analyzer = MarkingAnalyzer::new();
let marking_result = marking_analyzer.analyze(marking_events);
if !marking_result.is_safe() {
for leak in &marking_result.leaks {
violations.push(ObligationTheoryViolation::MarkingLeak {
description: format!("VASS marking non-zero at region close: {leak:?}"),
});
}
for invalid in &marking_result.invalid_transitions {
violations.push(ObligationTheoryViolation::InvalidTransition {
description: format!("Invalid marking transition: {invalid:?}"),
});
}
}
let mut contract_checker = ContractChecker::new();
let contract_result = contract_checker.check(marking_events);
for violation in &contract_result.violations {
violations.push(ObligationTheoryViolation::ContractViolation {
description: format!("{violation:?}"),
});
}
let mut aliasing_prover = NoAliasingProver::new();
let aliasing_result = aliasing_prover.check(marking_events);
for counterexample in &aliasing_result.counterexamples {
violations.push(ObligationTheoryViolation::AliasingViolation {
description: format!("{counterexample:?}"),
});
}
violations
}
}
#[derive(Debug, Clone)]
pub enum ObligationTheoryViolation {
MarkingLeak {
description: String,
},
InvalidTransition {
description: String,
},
ContractViolation {
description: String,
},
AliasingViolation {
description: String,
},
}
impl std::fmt::Display for ObligationTheoryViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MarkingLeak { description } => write!(f, "Marking leak: {description}"),
Self::InvalidTransition { description } => {
write!(f, "Invalid transition: {description}")
}
Self::ContractViolation { description } => {
write!(f, "Contract violation: {description}")
}
Self::AliasingViolation { description } => {
write!(f, "Aliasing violation: {description}")
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OracleStats {
pub entities_tracked: usize,
pub events_recorded: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleEntryReport {
pub invariant: String,
pub passed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub violation: Option<String>,
pub stats: OracleStats,
}
impl OracleEntryReport {
fn from_result(
invariant: &'static str,
violation: Option<OracleViolation>,
stats: OracleStats,
) -> Self {
let passed = violation.is_none();
let violation_text = violation.map(|violation| violation.to_string());
crate::tracing_compat::info!(
event = "oracle_check",
invariant = invariant,
passed,
entities_tracked = stats.entities_tracked,
events_recorded = stats.events_recorded,
details = violation_text.as_deref().unwrap_or("clean"),
"oracle_check"
);
Self {
invariant: invariant.to_owned(),
passed,
violation: violation_text,
stats,
}
}
}
pub trait Oracle {
fn invariant_name(&self) -> &'static str;
fn violation(&self) -> Option<OracleViolation>;
fn stats(&self) -> OracleStats;
fn report_entry(&self) -> OracleEntryReport {
OracleEntryReport::from_result(self.invariant_name(), self.violation(), self.stats())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleReport {
pub entries: Vec<OracleEntryReport>,
pub total: usize,
pub passed: usize,
pub failed: usize,
pub check_time_nanos: u64,
}
impl OracleReport {
#[must_use]
pub fn all_passed(&self) -> bool {
self.failed == 0
}
#[must_use]
pub fn failures(&self) -> Vec<&OracleEntryReport> {
self.entries.iter().filter(|e| !e.passed).collect()
}
#[must_use]
pub fn entry(&self, invariant: &str) -> Option<&OracleEntryReport> {
self.entries
.iter()
.find(|e| e.invariant.as_str() == invariant)
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap_or_default()
}
#[must_use]
pub fn to_text(&self) -> String {
let mut out = String::new();
let _ = writeln!(
&mut out,
"Oracle Report: {}/{} passed ({} failed)",
self.passed, self.total, self.failed
);
let _ = writeln!(&mut out, "Check time: {}ns", self.check_time_nanos);
let _ = writeln!(&mut out, "---");
for entry in &self.entries {
let status = if entry.passed { "PASS" } else { "FAIL" };
let _ = write!(
&mut out,
"[{}] {} (tracked={}, events={})",
status, entry.invariant, entry.stats.entities_tracked, entry.stats.events_recorded
);
if let Some(ref v) = entry.violation {
let _ = write!(&mut out, " -- {v}");
}
let _ = writeln!(&mut out);
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "tracing-integration")]
use parking_lot::Mutex;
#[cfg(feature = "tracing-integration")]
use std::collections::BTreeMap;
#[cfg(feature = "tracing-integration")]
use std::sync::Arc;
#[cfg(feature = "tracing-integration")]
use tracing::Subscriber;
#[cfg(feature = "tracing-integration")]
use tracing::field::{Field, Visit};
#[cfg(feature = "tracing-integration")]
use tracing_subscriber::layer::{Context, Layer};
#[cfg(feature = "tracing-integration")]
use tracing_subscriber::prelude::*;
#[cfg(feature = "tracing-integration")]
use tracing_subscriber::registry::LookupSpan;
#[cfg(feature = "tracing-integration")]
#[derive(Debug, Clone, PartialEq, Eq)]
struct RecordedEvent {
fields: BTreeMap<String, String>,
}
#[cfg(feature = "tracing-integration")]
#[derive(Default)]
struct EventFieldVisitor {
fields: BTreeMap<String, String>,
}
#[cfg(feature = "tracing-integration")]
impl Visit for EventFieldVisitor {
fn record_bool(&mut self, field: &Field, value: bool) {
self.fields
.insert(field.name().to_owned(), value.to_string());
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.fields
.insert(field.name().to_owned(), value.to_string());
}
fn record_str(&mut self, field: &Field, value: &str) {
self.fields
.insert(field.name().to_owned(), value.to_owned());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name().to_owned(), format!("{value:?}"));
}
}
#[cfg(feature = "tracing-integration")]
#[derive(Default)]
struct EventRecorder {
events: Arc<Mutex<Vec<RecordedEvent>>>,
}
#[cfg(feature = "tracing-integration")]
impl<S> Layer<S> for EventRecorder
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = EventFieldVisitor::default();
event.record(&mut visitor);
self.events.lock().push(RecordedEvent {
fields: visitor.fields,
});
}
}
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn oracle_suite_default_is_clean() {
init_test("oracle_suite_default_is_clean");
let mut suite = OracleSuite::new();
let violations = suite.check_all(Time::ZERO);
let empty = violations.is_empty();
crate::assert_with_log!(empty, "suite clean", true, empty);
crate::test_complete!("oracle_suite_default_is_clean");
}
#[test]
fn hydrate_temporal_from_state_replays_finalizer_history() {
init_test("hydrate_temporal_from_state_replays_finalizer_history");
let mut state = crate::runtime::RuntimeState::new();
let region = state.create_root_region(crate::types::Budget::INFINITE);
state.now = Time::from_nanos(10);
let registered = state.register_sync_finalizer(region, || {});
crate::assert_with_log!(registered, "registered", true, registered);
state.now = Time::from_nanos(20);
state.record_finalizer_close_for_test(region);
let mut suite = OracleSuite::new();
suite.hydrate_temporal_from_state(&state, state.now);
let violation = suite
.finalizer
.check()
.expect_err("missing finalizer run should survive report hydration");
crate::assert_with_log!(
violation.region == region,
"region",
region,
violation.region
);
crate::assert_with_log!(
violation.unrun_finalizers == vec![FinalizerId(0)],
"unrun finalizers",
vec![FinalizerId(0)],
violation.unrun_finalizers
);
crate::test_complete!("hydrate_temporal_from_state_replays_finalizer_history");
}
#[test]
fn oracle_suite_debug() {
let suite = OracleSuite::new();
let dbg = format!("{suite:?}");
assert!(dbg.contains("OracleSuite"));
}
#[test]
fn oracle_suite_reset_stays_clean() {
let mut suite = OracleSuite::new();
suite.reset();
let violations = suite.check_all(Time::ZERO);
assert!(violations.is_empty());
}
#[test]
fn oracle_suite_report_all_pass() {
let mut suite = OracleSuite::new();
let report = suite.report(Time::ZERO);
assert!(report.all_passed());
assert_eq!(report.failed, 0);
assert_eq!(report.passed, report.total);
assert!(report.failures().is_empty());
}
#[test]
fn oracle_report_debug_clone() {
let mut suite = OracleSuite::new();
let report = suite.report(Time::ZERO);
let dbg = format!("{report:?}");
assert!(dbg.contains("OracleReport"));
let cloned = report.clone();
assert_eq!(cloned.total, report.total);
}
#[test]
fn oracle_report_to_json() {
let mut suite = OracleSuite::new();
let report = suite.report(Time::ZERO);
let json = report.to_json();
assert!(json.is_object());
assert!(json["entries"].is_array());
}
#[test]
fn oracle_report_to_text() {
let mut suite = OracleSuite::new();
let report = suite.report(Time::ZERO);
let text = report.to_text();
assert!(text.contains("Oracle Report"));
assert!(text.contains("PASS"));
}
#[cfg(feature = "tracing-integration")]
#[test]
#[allow(clippy::significant_drop_tightening)]
fn oracle_report_emits_structured_oracle_check_events() {
let mut suite = OracleSuite::new();
let events = Arc::new(Mutex::new(Vec::new()));
let recorder = EventRecorder {
events: events.clone(),
};
let subscriber = tracing_subscriber::registry().with(recorder);
let report = tracing::subscriber::with_default(subscriber, || suite.report(Time::ZERO));
assert!(report.all_passed());
let events = events.lock();
let task_leak_event = events.iter().find(|event| {
event.fields.get("event").map(String::as_str) == Some("oracle_check")
&& event.fields.get("invariant").map(String::as_str) == Some("task_leak")
});
let task_leak_event = task_leak_event.expect("task_leak oracle_check should be emitted");
assert_eq!(
task_leak_event.fields.get("passed").map(String::as_str),
Some("true")
);
assert_eq!(
task_leak_event.fields.get("details").map(String::as_str),
Some("clean")
);
}
#[test]
fn oracle_report_failure_helpers_surface_failed_entries() {
let report = OracleReport {
entries: vec![
OracleEntryReport {
invariant: "task_leak".to_string(),
passed: true,
violation: None,
stats: OracleStats {
entities_tracked: 2,
events_recorded: 4,
},
},
OracleEntryReport {
invariant: "obligation_leak".to_string(),
passed: false,
violation: Some("Obligation leak: leaked obligation".to_string()),
stats: OracleStats {
entities_tracked: 3,
events_recorded: 6,
},
},
],
total: 2,
passed: 1,
failed: 1,
check_time_nanos: 42,
};
let failures = report.failures();
assert_eq!(failures.len(), 1);
assert_eq!(failures[0].invariant, "obligation_leak");
assert!(!report.all_passed());
let text = report.to_text();
assert!(text.contains("FAIL"));
assert!(text.contains("Obligation leak: leaked obligation"));
}
#[test]
fn oracle_report_entry_lookup() {
let mut suite = OracleSuite::new();
let report = suite.report(Time::ZERO);
let entry = report.entry("task_leak");
assert!(entry.is_some());
assert!(entry.unwrap().passed);
assert!(report.entry("nonexistent_oracle").is_none());
}
#[test]
fn oracle_stats_debug_clone_eq() {
let stats = OracleStats {
entities_tracked: 5,
events_recorded: 10,
};
let dbg = format!("{stats:?}");
assert!(dbg.contains("OracleStats"));
let cloned = stats.clone();
assert_eq!(stats, cloned);
}
#[test]
fn oracle_stats_ne() {
let a = OracleStats {
entities_tracked: 5,
events_recorded: 10,
};
let b = OracleStats {
entities_tracked: 3,
events_recorded: 10,
};
assert_ne!(a, b);
}
#[test]
fn oracle_entry_report_debug_clone() {
let entry = OracleEntryReport {
invariant: "test".to_owned(),
passed: true,
violation: None,
stats: OracleStats {
entities_tracked: 0,
events_recorded: 0,
},
};
let dbg = format!("{entry:?}");
assert!(dbg.contains("OracleEntryReport"));
let cloned = entry;
assert_eq!(cloned.invariant, "test");
assert!(cloned.passed);
}
#[test]
fn oracle_entry_report_with_violation() {
let entry = OracleEntryReport {
invariant: "failing".to_owned(),
passed: false,
violation: Some("something leaked".to_owned()),
stats: OracleStats {
entities_tracked: 1,
events_recorded: 1,
},
};
assert!(!entry.passed);
assert!(entry.violation.as_deref().unwrap().contains("leaked"));
}
#[test]
fn oracle_violation_debug() {
let mut suite = OracleSuite::new();
let violations = suite.check_all(Time::ZERO);
assert!(violations.is_empty());
}
#[test]
fn oracle_violation_error_trait() {
fn assert_error_impl<T: std::error::Error>() {}
assert_error_impl::<OracleViolation>();
}
}