use crate::console::Console;
use crate::observability::spectral_health::{
SpectralHealthMonitor, SpectralHealthReport, SpectralThresholds,
};
use crate::record::ObligationState;
use crate::record::region::RegionState;
use crate::record::task::TaskState;
use crate::runtime::state::RuntimeState;
use crate::time::TimerDriverHandle;
use crate::tracing_compat::{debug, trace, warn};
use crate::types::{CancelKind, ObligationId, RegionId, TaskId, Time};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
#[derive(Debug)]
pub struct Diagnostics {
state: Arc<RuntimeState>,
spectral_monitor: parking_lot::Mutex<SpectralHealthMonitor>,
}
impl Diagnostics {
#[must_use]
pub fn new(state: Arc<RuntimeState>) -> Self {
Self {
state,
spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
SpectralThresholds::default(),
)),
}
}
#[must_use]
pub fn with_console(state: Arc<RuntimeState>, _console: Console) -> Self {
Self {
state,
spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
SpectralThresholds::default(),
)),
}
}
fn now(&self) -> Time {
self.state
.timer_driver()
.map_or(self.state.now, TimerDriverHandle::now)
}
fn build_task_wait_graph(&self) -> TaskWaitGraph {
let mut task_ids: Vec<TaskId> = self
.state
.tasks_iter()
.filter_map(|(_, task)| (!task.state.is_terminal()).then_some(task.id))
.collect();
task_ids.sort();
let index_by_task: BTreeMap<TaskId, usize> = task_ids
.iter()
.enumerate()
.map(|(i, id)| (*id, i))
.collect();
let mut directed_edges = Vec::new();
for (_, task) in self.state.tasks_iter() {
if task.state.is_terminal() {
continue;
}
let Some(&target_idx) = index_by_task.get(&task.id) else {
continue;
};
for waiter in &task.waiters {
if let Some(&waiter_idx) = index_by_task.get(waiter) {
directed_edges.push((waiter_idx, target_idx));
}
}
}
directed_edges.sort_unstable();
directed_edges.dedup();
let undirected_edges: Vec<(usize, usize)> = directed_edges
.iter()
.map(|(u, v)| if u < v { (*u, *v) } else { (*v, *u) })
.collect::<std::collections::BTreeSet<_>>()
.into_iter()
.collect();
TaskWaitGraph {
task_ids,
directed_edges,
undirected_edges,
}
}
#[must_use]
pub fn analyze_structural_health(&self) -> SpectralHealthReport {
let graph = self.build_task_wait_graph();
let adjacency = wait_graph_adjacency(&graph);
let mut monitor = self.spectral_monitor.lock();
monitor.analyze_with_trapped_cycle(
graph.task_ids.len(),
&graph.undirected_edges,
has_trapped_wait_cycle(&adjacency),
)
}
#[must_use]
pub fn analyze_directional_deadlock(&self) -> DirectionalDeadlockReport {
let graph = self.build_task_wait_graph();
if graph.task_ids.is_empty() {
return DirectionalDeadlockReport::empty();
}
let adjacency = wait_graph_adjacency(&graph);
let sccs = strongly_connected_components(&adjacency);
let mut components = Vec::new();
let mut trapped = 0_u32;
let mut cycle_nodes = 0_usize;
for nodes in sccs {
let has_cycle = if nodes.len() > 1 {
true
} else {
let n0 = nodes[0];
adjacency[n0].contains(&n0)
};
if !has_cycle {
continue;
}
cycle_nodes = cycle_nodes.saturating_add(nodes.len());
let mut ingress = 0_u32;
let mut egress = 0_u32;
for &u in &nodes {
for &v in &adjacency[u] {
if nodes.binary_search(&v).is_ok() {
continue;
}
egress = egress.saturating_add(1);
}
}
let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
for (u, edges) in adjacency.iter().enumerate() {
if node_set.contains(&u) {
continue;
}
for &v in edges {
if node_set.contains(&v) {
ingress = ingress.saturating_add(1);
}
}
}
let trapped_component = egress == 0;
if trapped_component {
trapped = trapped.saturating_add(1);
}
let mut tasks: Vec<TaskId> = nodes.iter().map(|idx| graph.task_ids[*idx]).collect();
tasks.sort();
components.push(DeadlockCycle {
tasks,
ingress_edges: ingress,
egress_edges: egress,
trapped: trapped_component,
});
}
components.sort_by_key(|c| c.tasks.len());
components.reverse();
#[allow(clippy::cast_precision_loss)]
let cycle_ratio = if graph.task_ids.is_empty() {
0.0
} else {
cycle_nodes as f64 / graph.task_ids.len() as f64
};
#[allow(clippy::cast_precision_loss)]
let trapped_ratio = if components.is_empty() {
0.0
} else {
f64::from(trapped) / components.len() as f64
};
let risk_score = 0.6f64
.mul_add(trapped_ratio, 0.4 * cycle_ratio)
.clamp(0.0, 1.0);
let severity = if trapped > 0 {
DeadlockSeverity::Critical
} else if !components.is_empty() {
DeadlockSeverity::Elevated
} else {
DeadlockSeverity::None
};
DirectionalDeadlockReport {
severity,
risk_score,
cycles: components,
}
}
#[must_use]
pub fn explain_region_open(&self, region_id: RegionId) -> RegionOpenExplanation {
trace!(region_id = ?region_id, "diagnostics: explain_region_open");
let Some(region) = self.state.region(region_id) else {
return RegionOpenExplanation {
region_id,
region_state: None,
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["Verify region id is valid".to_string()],
};
};
let region_state = region.state();
if region_state == RegionState::Closed {
return RegionOpenExplanation {
region_id,
region_state: Some(region_state),
reasons: Vec::new(),
recommendations: Vec::new(),
};
}
let mut reasons = Vec::new();
let mut child_ids = region.child_ids();
child_ids.sort();
for child_id in child_ids {
if let Some(child) = self.state.region(child_id) {
let child_state = child.state();
if child_state != RegionState::Closed {
reasons.push(Reason::ChildRegionOpen {
child_id,
child_state,
});
}
}
}
let mut task_ids = region.task_ids();
task_ids.sort();
for task_id in task_ids {
if let Some(task) = self.state.task(task_id) {
if !task.state.is_terminal() {
reasons.push(Reason::TaskRunning {
task_id,
task_state: task.state_name().to_string(),
poll_count: task.total_polls,
});
}
}
}
let mut held = Vec::new();
for (_, ob) in self.state.obligations_iter() {
if ob.region == region_id && ob.state == ObligationState::Reserved {
held.push((ob.id, ob.holder, ob.kind));
}
}
held.sort_by_key(|(id, _, _)| *id);
for (id, holder, kind) in held {
reasons.push(Reason::ObligationHeld {
obligation_id: id,
obligation_type: format!("{kind:?}"),
holder_task: holder,
});
}
let mut recommendations = Vec::new();
if reasons
.iter()
.any(|r| matches!(r, Reason::ChildRegionOpen { .. }))
{
recommendations.push("Wait for child regions to close, or cancel them.".to_string());
}
if reasons
.iter()
.any(|r| matches!(r, Reason::TaskRunning { .. }))
{
recommendations
.push("Wait for live tasks to complete, or cancel the region.".to_string());
}
if reasons
.iter()
.any(|r| matches!(r, Reason::ObligationHeld { .. }))
{
recommendations
.push("Ensure obligations are committed/aborted before closing.".to_string());
}
let deadlock = self.analyze_directional_deadlock();
if deadlock.severity != DeadlockSeverity::None {
recommendations.push(format!(
"Directional deadlock risk {:?} (score {:.3}); inspect cycles and break wait-for loops.",
deadlock.severity, deadlock.risk_score
));
}
debug!(
region_id = ?region_id,
region_state = ?region_state,
reason_count = reasons.len(),
"diagnostics: region open explanation computed"
);
RegionOpenExplanation {
region_id,
region_state: Some(region_state),
reasons,
recommendations,
}
}
#[must_use]
pub fn explain_task_blocked(&self, task_id: TaskId) -> TaskBlockedExplanation {
trace!(task_id = ?task_id, "diagnostics: explain_task_blocked");
let Some(task) = self.state.task(task_id) else {
return TaskBlockedExplanation {
task_id,
block_reason: BlockReason::TaskNotFound,
details: Vec::new(),
recommendations: vec!["Verify task id is valid".to_string()],
};
};
let mut details = Vec::new();
let mut recommendations = Vec::new();
let block_reason = match &task.state {
TaskState::Created => {
recommendations.push("Task has not started polling yet.".to_string());
BlockReason::NotStarted
}
TaskState::Running => {
if task.wake_state.is_notified() {
recommendations
.push("Task has a pending wake; it should be scheduled soon.".to_string());
BlockReason::AwaitingSchedule
} else {
recommendations
.push("Task appears to be awaiting an async operation.".to_string());
BlockReason::AwaitingFuture {
description: "unknown await point".to_string(),
}
}
}
TaskState::CancelRequested { reason, .. } => {
details.push(format!("cancel kind: {}", reason.kind));
if let Some(msg) = &reason.message.as_deref() {
details.push(format!("message: {msg}"));
}
recommendations.push("Task is cancelling; wait for drain/finalizers.".to_string());
BlockReason::CancelRequested {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
}
}
TaskState::Cancelling {
reason,
cleanup_budget,
} => {
details.push(format!("cancel kind: {}", reason.kind));
details.push(format!(
"cleanup polls remaining: {}",
cleanup_budget.poll_quota
));
BlockReason::RunningCleanup {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
polls_remaining: cleanup_budget.poll_quota,
}
}
TaskState::Finalizing {
reason,
cleanup_budget,
} => {
details.push(format!("cancel kind: {}", reason.kind));
details.push(format!(
"cleanup polls remaining: {}",
cleanup_budget.poll_quota
));
BlockReason::Finalizing {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
polls_remaining: cleanup_budget.poll_quota,
}
}
TaskState::Completed(outcome) => {
details.push(format!("outcome: {outcome:?}"));
BlockReason::Completed
}
};
if !task.waiters.is_empty() {
details.push(format!("waiters: {}", task.waiters.len()));
}
TaskBlockedExplanation {
task_id,
block_reason,
details,
recommendations,
}
}
#[must_use]
pub fn find_leaked_obligations(&self) -> Vec<ObligationLeak> {
let now = self.now();
let mut leaks = Vec::new();
for (_, ob) in self.state.obligations_iter() {
if ob.state == ObligationState::Reserved {
if let Some(holder) = self.state.task(ob.holder) {
if matches!(holder.state, TaskState::Completed(_)) {
continue;
}
}
let age = std::time::Duration::from_nanos(now.duration_since(ob.reserved_at));
leaks.push(ObligationLeak {
obligation_id: ob.id,
obligation_type: format!("{:?}", ob.kind),
holder_task: Some(ob.holder),
region_id: ob.region,
age,
});
}
}
leaks.sort_by_key(|l| (l.region_id, l.obligation_id));
if !leaks.is_empty() {
warn!(
count = leaks.len(),
"diagnostics: potential obligation leaks detected"
);
}
leaks
}
}
#[derive(Debug, Clone)]
struct TaskWaitGraph {
task_ids: Vec<TaskId>,
directed_edges: Vec<(usize, usize)>,
undirected_edges: Vec<(usize, usize)>,
}
fn wait_graph_adjacency(graph: &TaskWaitGraph) -> Vec<Vec<usize>> {
let mut adjacency = vec![Vec::new(); graph.task_ids.len()];
for &(u, v) in &graph.directed_edges {
if u < adjacency.len() && v < adjacency.len() {
adjacency[u].push(v);
}
}
for edges in &mut adjacency {
edges.sort_unstable();
edges.dedup();
}
adjacency
}
fn has_trapped_wait_cycle(adjacency: &[Vec<usize>]) -> bool {
for nodes in strongly_connected_components(adjacency) {
let has_cycle = if nodes.len() > 1 {
true
} else {
let n0 = nodes[0];
adjacency[n0].contains(&n0)
};
if !has_cycle {
continue;
}
let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
let has_egress = nodes
.iter()
.any(|&u| adjacency[u].iter().any(|v| !node_set.contains(v)));
if !has_egress {
return true;
}
}
false
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadlockSeverity {
None,
Elevated,
Critical,
}
#[derive(Debug, Clone)]
pub struct DeadlockCycle {
pub tasks: Vec<TaskId>,
pub ingress_edges: u32,
pub egress_edges: u32,
pub trapped: bool,
}
#[derive(Debug, Clone)]
pub struct DirectionalDeadlockReport {
pub severity: DeadlockSeverity,
pub risk_score: f64,
pub cycles: Vec<DeadlockCycle>,
}
impl DirectionalDeadlockReport {
#[must_use]
fn empty() -> Self {
Self {
severity: DeadlockSeverity::None,
risk_score: 0.0,
cycles: Vec::new(),
}
}
}
#[must_use]
fn strongly_connected_components(adjacency: &[Vec<usize>]) -> Vec<Vec<usize>> {
struct Tarjan<'a> {
adjacency: &'a [Vec<usize>],
index: usize,
stack: Vec<usize>,
on_stack: Vec<bool>,
indices: Vec<Option<usize>>,
lowlink: Vec<usize>,
sccs: Vec<Vec<usize>>,
}
impl Tarjan<'_> {
fn strongconnect(&mut self, v: usize) {
self.indices[v] = Some(self.index);
self.lowlink[v] = self.index;
self.index += 1;
self.stack.push(v);
self.on_stack[v] = true;
for &w in &self.adjacency[v] {
if self.indices[w].is_none() {
self.strongconnect(w);
self.lowlink[v] = self.lowlink[v].min(self.lowlink[w]);
} else if self.on_stack[w] {
self.lowlink[v] = self.lowlink[v].min(self.indices[w].unwrap_or(usize::MAX));
}
}
if self.lowlink[v] == self.indices[v].unwrap_or(usize::MAX) {
let mut scc = Vec::new();
while let Some(w) = self.stack.pop() {
self.on_stack[w] = false;
scc.push(w);
if w == v {
break;
}
}
scc.sort_unstable();
self.sccs.push(scc);
}
}
}
let n = adjacency.len();
let mut tarjan = Tarjan {
adjacency,
index: 0,
stack: Vec::new(),
on_stack: vec![false; n],
indices: vec![None; n],
lowlink: vec![0; n],
sccs: Vec::new(),
};
for v in 0..n {
if tarjan.indices[v].is_none() {
tarjan.strongconnect(v);
}
}
tarjan.sccs
}
#[derive(Debug, Clone)]
pub struct RegionOpenExplanation {
pub region_id: RegionId,
pub region_state: Option<RegionState>,
pub reasons: Vec<Reason>,
pub recommendations: Vec<String>,
}
impl fmt::Display for RegionOpenExplanation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Region {:?} is still open.", self.region_id)?;
if let Some(st) = self.region_state {
writeln!(f, " state: {st:?}")?;
}
for r in &self.reasons {
writeln!(f, " - {r}")?;
}
for rec in &self.recommendations {
writeln!(f, " -> {rec}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum Reason {
RegionNotFound,
ChildRegionOpen {
child_id: RegionId,
child_state: RegionState,
},
TaskRunning {
task_id: TaskId,
task_state: String,
poll_count: u64,
},
ObligationHeld {
obligation_id: ObligationId,
obligation_type: String,
holder_task: TaskId,
},
}
impl fmt::Display for Reason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RegionNotFound => write!(f, "region not found"),
Self::ChildRegionOpen {
child_id,
child_state,
} => write!(f, "child region {child_id:?} still open ({child_state:?})"),
Self::TaskRunning {
task_id,
task_state,
poll_count,
} => write!(
f,
"task {task_id:?} still running (state={task_state}, polls={poll_count})"
),
Self::ObligationHeld {
obligation_id,
obligation_type,
holder_task,
} => write!(
f,
"obligation {obligation_id:?} held by task {holder_task:?} (type={obligation_type})"
),
}
}
}
#[derive(Debug, Clone)]
pub struct TaskBlockedExplanation {
pub task_id: TaskId,
pub block_reason: BlockReason,
pub details: Vec<String>,
pub recommendations: Vec<String>,
}
impl fmt::Display for TaskBlockedExplanation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Task {:?} blocked: {}", self.task_id, self.block_reason)?;
for d in &self.details {
writeln!(f, " - {d}")?;
}
for rec in &self.recommendations {
writeln!(f, " -> {rec}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum BlockReason {
TaskNotFound,
NotStarted,
AwaitingSchedule,
AwaitingFuture {
description: String,
},
CancelRequested {
reason: CancelReasonInfo,
},
RunningCleanup {
reason: CancelReasonInfo,
polls_remaining: u32,
},
Finalizing {
reason: CancelReasonInfo,
polls_remaining: u32,
},
Completed,
}
impl fmt::Display for BlockReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TaskNotFound => f.write_str("task not found"),
Self::NotStarted => f.write_str("not started"),
Self::AwaitingSchedule => f.write_str("awaiting schedule"),
Self::AwaitingFuture { description } => write!(f, "awaiting future ({description})"),
Self::CancelRequested { reason } => write!(f, "cancel requested ({reason})"),
Self::RunningCleanup {
reason,
polls_remaining,
} => write!(
f,
"running cleanup ({reason}, polls_remaining={polls_remaining})"
),
Self::Finalizing {
reason,
polls_remaining,
} => write!(
f,
"finalizing ({reason}, polls_remaining={polls_remaining})"
),
Self::Completed => f.write_str("completed"),
}
}
}
#[derive(Debug, Clone)]
pub struct CancellationExplanation {
pub kind: CancelKind,
pub message: Option<String>,
pub propagation_path: Vec<CancellationStep>,
}
#[derive(Debug, Clone)]
pub struct CancellationStep {
pub region_id: RegionId,
pub kind: CancelKind,
}
#[derive(Debug, Clone)]
pub struct CancelReasonInfo {
pub kind: CancelKind,
pub message: Option<String>,
}
impl CancelReasonInfo {
fn from_reason(kind: CancelKind, message: Option<&str>) -> Self {
Self {
kind,
message: message.map(str::to_string),
}
}
}
impl fmt::Display for CancelReasonInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(msg) = &self.message {
write!(f, "{} ({msg})", self.kind)
} else {
write!(f, "{}", self.kind)
}
}
}
#[derive(Debug, Clone)]
pub struct ObligationLeak {
pub obligation_id: ObligationId,
pub obligation_type: String,
pub holder_task: Option<TaskId>,
pub region_id: RegionId,
pub age: std::time::Duration,
}
pub const ADVANCED_OBSERVABILITY_CONTRACT_VERSION: &str = "doctor-observability-v1";
pub const ADVANCED_OBSERVABILITY_BASELINE_VERSION: &str = "doctor-logging-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedEventClass {
CommandLifecycle,
IntegrationReliability,
RemediationSafety,
ReplayDeterminism,
VerificationGovernance,
}
impl AdvancedEventClass {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::CommandLifecycle => "command_lifecycle",
Self::IntegrationReliability => "integration_reliability",
Self::RemediationSafety => "remediation_safety",
Self::ReplayDeterminism => "replay_determinism",
Self::VerificationGovernance => "verification_governance",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedSeverity {
Info,
Warning,
Error,
Critical,
}
impl AdvancedSeverity {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Info => "info",
Self::Warning => "warning",
Self::Error => "error",
Self::Critical => "critical",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TroubleshootingDimension {
CancellationPath,
ContractCompliance,
Determinism,
ExternalDependency,
OperatorAction,
RecoveryPlanning,
RuntimeInvariant,
}
impl TroubleshootingDimension {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::CancellationPath => "cancellation_path",
Self::ContractCompliance => "contract_compliance",
Self::Determinism => "determinism",
Self::ExternalDependency => "external_dependency",
Self::OperatorAction => "operator_action",
Self::RecoveryPlanning => "recovery_planning",
Self::RuntimeInvariant => "runtime_invariant",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedEventClassSpec {
pub class_id: String,
pub description: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedSeveritySpec {
pub severity: String,
pub meaning: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TroubleshootingDimensionSpec {
pub dimension: String,
pub purpose: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedObservabilityContract {
pub contract_version: String,
pub baseline_contract_version: String,
pub event_classes: Vec<AdvancedEventClassSpec>,
pub severity_semantics: Vec<AdvancedSeveritySpec>,
pub troubleshooting_dimensions: Vec<TroubleshootingDimensionSpec>,
pub compatibility_notes: Vec<String>,
}
pub const TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION: &str = "runtime-tail-latency-taxonomy-v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyLogFieldSpec {
pub key: String,
pub unit: String,
pub required: bool,
pub meaning: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencySignalSpec {
pub signal_id: String,
pub structured_log_key: String,
pub unit: String,
pub producer_kind: String,
pub producer_symbol: String,
pub producer_file: String,
pub measurement_class: String,
pub core: bool,
pub notes: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyTermSpec {
pub term_id: String,
pub description: String,
pub direct_duration_key: String,
pub attribution_state_key: String,
pub signals: Vec<TailLatencySignalSpec>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyTaxonomyContract {
pub contract_version: String,
pub equation: String,
pub total_latency_key: String,
pub unknown_bucket_key: String,
pub required_log_fields: Vec<TailLatencyLogFieldSpec>,
pub terms: Vec<TailLatencyTermSpec>,
pub sampling_policy: Vec<String>,
pub compatibility_notes: Vec<String>,
}
fn tail_latency_log_field(
key: &str,
unit: &str,
required: bool,
meaning: &str,
) -> TailLatencyLogFieldSpec {
TailLatencyLogFieldSpec {
key: key.to_string(),
unit: unit.to_string(),
required,
meaning: meaning.to_string(),
}
}
#[allow(clippy::too_many_arguments)]
fn tail_latency_signal(
signal_id: &str,
structured_log_key: &str,
unit: &str,
producer_kind: &str,
producer_symbol: &str,
producer_file: &str,
measurement_class: &str,
core: bool,
notes: &str,
) -> TailLatencySignalSpec {
TailLatencySignalSpec {
signal_id: signal_id.to_string(),
structured_log_key: structured_log_key.to_string(),
unit: unit.to_string(),
producer_kind: producer_kind.to_string(),
producer_symbol: producer_symbol.to_string(),
producer_file: producer_file.to_string(),
measurement_class: measurement_class.to_string(),
core,
notes: notes.to_string(),
}
}
fn queueing_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "queueing".to_string(),
description:
"Backlog before useful work begins, spanning ready queues, waiters, and drain queues."
.to_string(),
direct_duration_key: "tail.queueing.ns".to_string(),
attribution_state_key: "tail.queueing.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"queueing.ready_queue_depth",
"tail.queueing.ready_queue_depth",
"count",
"snapshot_field",
"asupersync::obligation::lyapunov::StateSnapshot::ready_queue_depth",
"src/obligation/lyapunov.rs",
"proxy_signal",
true,
"Canonical scheduler backlog proxy used by the three-lane decision contract.",
),
tail_latency_signal(
"queueing.draining_regions",
"tail.queueing.draining_regions",
"count",
"snapshot_field",
"asupersync::obligation::lyapunov::StateSnapshot::draining_regions",
"src/obligation/lyapunov.rs",
"proxy_signal",
true,
"Captures cancellation/finalizer drain backlog that elongates queueing tails.",
),
tail_latency_signal(
"queueing.bulkhead_queue_depth",
"tail.queueing.bulkhead_queue_depth",
"count",
"stats_struct",
"asupersync::combinator::bulkhead::BulkheadMetrics::queue_depth",
"src/combinator/bulkhead.rs",
"proxy_signal",
false,
"Extended queueing proxy for admission-controlled bulkhead lanes.",
),
tail_latency_signal(
"queueing.pool_waiters",
"tail.queueing.pool_waiters",
"count",
"stats_struct",
"asupersync::sync::pool::PoolStats::waiters",
"src/sync/pool.rs",
"proxy_signal",
false,
"Extended backlog proxy for pool acquisition queues.",
),
],
}
}
fn service_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "service".to_string(),
description:
"CPU work once the task is scheduled, including poll consumption and budget burn."
.to_string(),
direct_duration_key: "tail.service.ns".to_string(),
attribution_state_key: "tail.service.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"service.poll_count",
"tail.service.poll_count",
"count",
"snapshot_field",
"asupersync::runtime::state::TaskSnapshot::poll_count",
"src/runtime/state.rs",
"proxy_signal",
true,
"Canonical always-on service proxy derived from task budget consumption.",
),
tail_latency_signal(
"service.poll_quota_consumed",
"tail.service.poll_quota_consumed",
"quota_units",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::poll_quota_consumed",
"src/observability/resource_accounting.rs",
"proxy_signal",
true,
"Aggregated service-pressure counter for runtime/test emitters.",
),
tail_latency_signal(
"service.cost_quota_consumed",
"tail.service.cost_quota_consumed",
"cost_units",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::cost_quota_consumed",
"src/observability/resource_accounting.rs",
"proxy_signal",
false,
"Extended service-pressure counter for cost-aware workloads.",
),
],
}
}
fn io_or_network_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "io_or_network".to_string(),
description: "Latency spent waiting on or draining reactor/network activity.".to_string(),
direct_duration_key: "tail.io_or_network.ns".to_string(),
attribution_state_key: "tail.io_or_network.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"io_or_network.events_received",
"tail.io_or_network.events_received",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::events_received",
"src/runtime/io_driver.rs",
"proxy_signal",
true,
"Canonical always-on I/O/network pressure proxy from the reactor driver.",
),
tail_latency_signal(
"io_or_network.polls",
"tail.io_or_network.polls",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::polls",
"src/runtime/io_driver.rs",
"proxy_signal",
false,
"Extended reactor activity proxy for sustained polling pressure.",
),
tail_latency_signal(
"io_or_network.wakers_dispatched",
"tail.io_or_network.wakers_dispatched",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::wakers_dispatched",
"src/runtime/io_driver.rs",
"proxy_signal",
false,
"Extended proxy for wake fan-out caused by readiness events.",
),
],
}
}
fn retries_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "retries".to_string(),
description:
"Backoff and reattempt inflation introduced by retry/rate-limit/circuit-breaker control loops."
.to_string(),
direct_duration_key: "tail.retries.ns".to_string(),
attribution_state_key: "tail.retries.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"retries.total_delay_ns",
"tail.retries.total_delay_ns",
"ns",
"state_field",
"asupersync::combinator::retry::RetryState::total_delay",
"src/combinator/retry.rs",
"direct_duration",
true,
"Direct retry-delay contribution from the retry combinator.",
),
tail_latency_signal(
"retries.rate_limit_total_wait_ns",
"tail.retries.rate_limit_total_wait_ns",
"ns",
"stats_struct",
"asupersync::combinator::rate_limit::RateLimitMetrics::total_wait_time",
"src/combinator/rate_limit.rs",
"direct_duration",
false,
"Extended direct delay when token-bucket admission defers work.",
),
tail_latency_signal(
"retries.circuit_rejected_total",
"tail.retries.circuit_rejected_total",
"count",
"stats_struct",
"asupersync::combinator::circuit_breaker::CircuitBreakerMetrics::total_rejected",
"src/combinator/circuit_breaker.rs",
"proxy_signal",
false,
"Extended retry/control-loop pressure proxy when open circuits reject work.",
),
],
}
}
fn synchronization_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "synchronization".to_string(),
description:
"Coordination delay from locks, pools, obligations, and cancellation-aware rendezvous."
.to_string(),
direct_duration_key: "tail.synchronization.ns".to_string(),
attribution_state_key: "tail.synchronization.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"synchronization.lock_wait_ns",
"tail.synchronization.lock_wait_ns",
"ns",
"stats_struct",
"asupersync::sync::contended_mutex::LockMetricsSnapshot::wait_ns",
"src/sync/contended_mutex.rs",
"direct_duration",
true,
"Canonical direct synchronization delay from contention-instrumented locks.",
),
tail_latency_signal(
"synchronization.lock_hold_ns",
"tail.synchronization.lock_hold_ns",
"ns",
"stats_struct",
"asupersync::sync::contended_mutex::LockMetricsSnapshot::hold_ns",
"src/sync/contended_mutex.rs",
"proxy_signal",
false,
"Extended proxy for convoying and long critical sections.",
),
tail_latency_signal(
"synchronization.pool_total_wait_ns",
"tail.synchronization.pool_total_wait_ns",
"ns",
"stats_struct",
"asupersync::sync::pool::PoolStats::total_wait_time",
"src/sync/pool.rs",
"direct_duration",
false,
"Extended direct delay from resource-pool acquisition waits.",
),
tail_latency_signal(
"synchronization.obligations_pending",
"tail.synchronization.obligations_pending",
"count",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::obligations_pending",
"src/observability/resource_accounting.rs",
"proxy_signal",
true,
"Captures obligation/cancellation backlog that can extend synchronization tails.",
),
],
}
}
fn allocator_or_cache_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "allocator_or_cache".to_string(),
description:
"Allocator and cache-locality pressure observable from region-heap churn and memory high-water marks."
.to_string(),
direct_duration_key: "tail.allocator_or_cache.ns".to_string(),
attribution_state_key: "tail.allocator_or_cache.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"allocator_or_cache.live_allocations",
"tail.allocator_or_cache.live_allocations",
"count",
"stats_struct",
"asupersync::runtime::region_heap::HeapStats::live",
"src/runtime/region_heap.rs",
"proxy_signal",
true,
"Canonical allocator-pressure proxy from live region-heap allocations.",
),
tail_latency_signal(
"allocator_or_cache.bytes_live",
"tail.allocator_or_cache.bytes_live",
"bytes",
"stats_struct",
"asupersync::runtime::region_heap::HeapStats::bytes_live",
"src/runtime/region_heap.rs",
"proxy_signal",
false,
"Extended allocator-pressure proxy for live retained bytes.",
),
tail_latency_signal(
"allocator_or_cache.heap_bytes_peak",
"tail.allocator_or_cache.heap_bytes_peak",
"bytes",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::heap_bytes_peak",
"src/observability/resource_accounting.rs",
"proxy_signal",
false,
"Extended region-level memory high-water mark for cache/allocator analysis.",
),
],
}
}
fn unknown_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "unknown".to_string(),
description:
"Residual latency that remains unattributed after measured terms and proxies are accounted for."
.to_string(),
direct_duration_key: "tail.unknown.unmeasured_ns".to_string(),
attribution_state_key: "tail.unknown.attribution_state".to_string(),
signals: vec![tail_latency_signal(
"unknown.unmeasured_ns",
"tail.unknown.unmeasured_ns",
"ns",
"contract_field",
"asupersync::observability::diagnostics::tail_latency_taxonomy_contract",
"src/observability/diagnostics.rs",
"unknown_bucket",
true,
"Must be emitted whenever any term lacks direct attribution so latency does not disappear from evidence bundles.",
)],
}
}
#[must_use]
pub fn tail_latency_taxonomy_contract() -> TailLatencyTaxonomyContract {
TailLatencyTaxonomyContract {
contract_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
equation: "tail_latency_ns = queueing_ns + service_ns + io_or_network_ns + retries_ns + synchronization_ns + allocator_or_cache_ns + unknown_ns".to_string(),
total_latency_key: "tail.total_latency_ns".to_string(),
unknown_bucket_key: "tail.unknown.unmeasured_ns".to_string(),
required_log_fields: vec![
tail_latency_log_field(
"tail.contract_version",
"schema_id",
true,
"Versioned tail-latency taxonomy contract identifier.",
),
tail_latency_log_field(
"tail.total_latency_ns",
"ns",
true,
"Observed end-to-end tail latency for the operation under analysis.",
),
tail_latency_log_field(
"tail.queueing.ready_queue_depth",
"count",
true,
"Always-on queueing proxy based on runnable backlog.",
),
tail_latency_log_field(
"tail.service.poll_count",
"count",
true,
"Service-side work proxy based on task poll demand.",
),
tail_latency_log_field(
"tail.io_or_network.events_received",
"count",
true,
"I/O or network pressure proxy based on reactor event volume.",
),
tail_latency_log_field(
"tail.retries.total_delay_ns",
"ns",
true,
"Direct retry/backoff delay accumulated by retry combinators.",
),
tail_latency_log_field(
"tail.synchronization.lock_wait_ns",
"ns",
true,
"Direct synchronization delay from contention-instrumented locks.",
),
tail_latency_log_field(
"tail.allocator_or_cache.live_allocations",
"count",
true,
"Allocator/cache pressure proxy based on live region-heap allocations.",
),
tail_latency_log_field(
"tail.unknown.unmeasured_ns",
"ns",
true,
"Residual latency that remains unattributed after measured terms and proxies are recorded.",
),
],
terms: vec![
queueing_tail_latency_term(),
service_tail_latency_term(),
io_or_network_tail_latency_term(),
retries_tail_latency_term(),
synchronization_tail_latency_term(),
allocator_or_cache_tail_latency_term(),
unknown_tail_latency_term(),
],
sampling_policy: vec![
"Always emit the required core fields for any tail-latency event, even when extended observability sampling is disabled.".to_string(),
"Extended fields may be sampled or emitted only in replay/forensics modes, but they must retain the stable keys defined here.".to_string(),
"If a direct-duration field is unavailable for a term, preserve proxy signals and roll the residual duration into tail.unknown.unmeasured_ns.".to_string(),
],
compatibility_notes: vec![
"Structured-log keys are append-only within a contract version; removals or unit changes require a new contract version.".to_string(),
"Proxy signals are not interchangeable with direct-duration fields; emitters must preserve both semantics explicitly.".to_string(),
"Unknown contribution is mandatory whenever attribution is incomplete so downstream controllers never treat missing data as zero.".to_string(),
],
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BaselineLogEvent<'a> {
pub flow_id: &'a str,
pub event_kind: &'a str,
pub outcome_class: &'a str,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedClassificationConflict {
FlowEventMismatch {
flow_id: String,
event_kind: String,
},
OutcomeEventMismatch {
event_kind: String,
outcome_class: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedLogClassification {
pub event_class: AdvancedEventClass,
pub severity: AdvancedSeverity,
pub dimensions: Vec<TroubleshootingDimension>,
pub narrative: String,
pub recommended_action: String,
pub conflicts: Vec<AdvancedClassificationConflict>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineFlowId {
Execution,
Integration,
Remediation,
Replay,
}
impl BaselineFlowId {
fn parse(raw: &str) -> Option<Self> {
match raw {
"execution" => Some(Self::Execution),
"integration" => Some(Self::Integration),
"remediation" => Some(Self::Remediation),
"replay" => Some(Self::Replay),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineEventKind {
CommandComplete,
CommandStart,
IntegrationError,
IntegrationSync,
RemediationApply,
RemediationVerify,
ReplayComplete,
ReplayStart,
VerificationSummary,
}
impl BaselineEventKind {
fn parse(raw: &str) -> Option<Self> {
match raw {
"command_complete" => Some(Self::CommandComplete),
"command_start" => Some(Self::CommandStart),
"integration_error" => Some(Self::IntegrationError),
"integration_sync" => Some(Self::IntegrationSync),
"remediation_apply" => Some(Self::RemediationApply),
"remediation_verify" => Some(Self::RemediationVerify),
"replay_complete" => Some(Self::ReplayComplete),
"replay_start" => Some(Self::ReplayStart),
"verification_summary" => Some(Self::VerificationSummary),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineOutcomeClass {
Cancelled,
Failed,
Success,
}
impl BaselineOutcomeClass {
fn parse(raw: &str) -> Option<Self> {
match raw {
"cancelled" => Some(Self::Cancelled),
"failed" => Some(Self::Failed),
"success" => Some(Self::Success),
_ => None,
}
}
}
#[must_use]
pub fn advanced_observability_contract() -> AdvancedObservabilityContract {
AdvancedObservabilityContract {
contract_version: ADVANCED_OBSERVABILITY_CONTRACT_VERSION.to_string(),
baseline_contract_version: ADVANCED_OBSERVABILITY_BASELINE_VERSION.to_string(),
event_classes: vec![
AdvancedEventClassSpec {
class_id: AdvancedEventClass::CommandLifecycle.as_str().to_string(),
description: "Execution command lifecycle and gate telemetry.".to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::IntegrationReliability
.as_str()
.to_string(),
description: "Cross-system integration health and boundary reliability."
.to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::RemediationSafety.as_str().to_string(),
description: "Remediation safety, application, and post-fix verification."
.to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::ReplayDeterminism.as_str().to_string(),
description: "Replay lifecycle and deterministic reproducibility.".to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::VerificationGovernance
.as_str()
.to_string(),
description: "Verification summary and governance gate posture.".to_string(),
},
],
severity_semantics: vec![
AdvancedSeveritySpec {
severity: AdvancedSeverity::Critical.as_str().to_string(),
meaning: "Contract/taxonomy contradiction requiring immediate correction."
.to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Error.as_str().to_string(),
meaning: "Actionable failure impacting reliability or correctness.".to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Info.as_str().to_string(),
meaning: "Expected state transition with no direct intervention required."
.to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Warning.as_str().to_string(),
meaning: "Non-terminal issue or cancellation requiring review.".to_string(),
},
],
troubleshooting_dimensions: vec![
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::CancellationPath
.as_str()
.to_string(),
purpose: "Track request/drain/finalize behavior for cancelled runs.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::ContractCompliance
.as_str()
.to_string(),
purpose: "Validate schema, gate, and policy conformance.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::Determinism.as_str().to_string(),
purpose: "Confirm replay stability and deterministic artifact lineage.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::ExternalDependency
.as_str()
.to_string(),
purpose: "Isolate third-party/system boundary failures.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::OperatorAction
.as_str()
.to_string(),
purpose: "Prioritize immediate operator decision paths.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::RecoveryPlanning
.as_str()
.to_string(),
purpose: "Drive remediation and verify-after-change sequencing.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::RuntimeInvariant
.as_str()
.to_string(),
purpose: "Connect events to runtime invariant health.".to_string(),
},
],
compatibility_notes: vec![
"Additive dimensions/classes may be introduced without baseline schema changes."
.to_string(),
"Field removals or semantic redefinitions require a contract-version bump.".to_string(),
"Unknown baseline flow/event/outcome values are hard validation errors.".to_string(),
],
}
}
pub fn classify_baseline_log_event(
event: BaselineLogEvent<'_>,
) -> Result<AdvancedLogClassification, String> {
let flow = BaselineFlowId::parse(event.flow_id)
.ok_or_else(|| format!("unknown flow_id {}", event.flow_id))?;
let kind = BaselineEventKind::parse(event.event_kind)
.ok_or_else(|| format!("unknown event_kind {}", event.event_kind))?;
let outcome = BaselineOutcomeClass::parse(event.outcome_class)
.ok_or_else(|| format!("unknown outcome_class {}", event.outcome_class))?;
let (event_class, mut dimensions, kind_narrative, action_hint) = kind_semantics(kind);
let mut conflicts = Vec::new();
let mut severity = match outcome {
BaselineOutcomeClass::Success => AdvancedSeverity::Info,
BaselineOutcomeClass::Cancelled => AdvancedSeverity::Warning,
BaselineOutcomeClass::Failed => AdvancedSeverity::Error,
};
if !flow_allows_event(flow, kind) {
conflicts.push(AdvancedClassificationConflict::FlowEventMismatch {
flow_id: event.flow_id.to_string(),
event_kind: event.event_kind.to_string(),
});
severity = AdvancedSeverity::Critical;
dimensions.push(TroubleshootingDimension::ContractCompliance);
}
if kind == BaselineEventKind::IntegrationError && outcome == BaselineOutcomeClass::Success {
conflicts.push(AdvancedClassificationConflict::OutcomeEventMismatch {
event_kind: event.event_kind.to_string(),
outcome_class: event.outcome_class.to_string(),
});
severity = severity.max(AdvancedSeverity::Error);
dimensions.push(TroubleshootingDimension::ContractCompliance);
}
if outcome == BaselineOutcomeClass::Cancelled {
dimensions.push(TroubleshootingDimension::CancellationPath);
}
if outcome == BaselineOutcomeClass::Failed {
dimensions.push(TroubleshootingDimension::RecoveryPlanning);
}
dimensions.sort_unstable();
dimensions.dedup();
conflicts.sort();
let outcome_phrase = match outcome {
BaselineOutcomeClass::Success => "completed successfully",
BaselineOutcomeClass::Cancelled => "was cancelled",
BaselineOutcomeClass::Failed => "failed",
};
Ok(AdvancedLogClassification {
event_class,
severity,
dimensions,
narrative: format!(
"{}:{} {}. {}",
event.flow_id, event.event_kind, outcome_phrase, kind_narrative
),
recommended_action: if conflicts.is_empty() {
action_hint.to_string()
} else {
format!(
"{action_hint} Resolve taxonomy conflicts before trusting downstream automation."
)
},
conflicts,
})
}
pub fn classify_baseline_log_events(
events: &[BaselineLogEvent<'_>],
) -> Result<Vec<AdvancedLogClassification>, String> {
events
.iter()
.map(|event| classify_baseline_log_event(*event))
.collect()
}
fn flow_allows_event(flow: BaselineFlowId, kind: BaselineEventKind) -> bool {
match flow {
BaselineFlowId::Execution => matches!(
kind,
BaselineEventKind::CommandComplete
| BaselineEventKind::CommandStart
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Integration => matches!(
kind,
BaselineEventKind::IntegrationError
| BaselineEventKind::IntegrationSync
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Remediation => matches!(
kind,
BaselineEventKind::RemediationApply
| BaselineEventKind::RemediationVerify
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Replay => matches!(
kind,
BaselineEventKind::ReplayComplete
| BaselineEventKind::ReplayStart
| BaselineEventKind::VerificationSummary
),
}
}
fn kind_semantics(
kind: BaselineEventKind,
) -> (
AdvancedEventClass,
Vec<TroubleshootingDimension>,
&'static str,
&'static str,
) {
match kind {
BaselineEventKind::CommandComplete => (
AdvancedEventClass::CommandLifecycle,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::OperatorAction,
],
"Execution gate completed and emitted a deterministic artifact pointer",
"Review gate summary and continue pipeline progression.",
),
BaselineEventKind::CommandStart => (
AdvancedEventClass::CommandLifecycle,
vec![TroubleshootingDimension::OperatorAction],
"Execution gate started with reproducible command provenance",
"Monitor for completion and verify emitted command provenance.",
),
BaselineEventKind::IntegrationError => (
AdvancedEventClass::IntegrationReliability,
vec![
TroubleshootingDimension::ExternalDependency,
TroubleshootingDimension::OperatorAction,
],
"Integration boundary reported an error at an external/system edge",
"Inspect integration target, retry posture, and boundary adapter diagnostics.",
),
BaselineEventKind::IntegrationSync => (
AdvancedEventClass::IntegrationReliability,
vec![TroubleshootingDimension::ExternalDependency],
"Integration synchronization event captured adapter boundary state",
"Verify upstream/downstream contract alignment for this sync point.",
),
BaselineEventKind::RemediationApply => (
AdvancedEventClass::RemediationSafety,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::RecoveryPlanning,
],
"Remediation apply phase executed against diagnosed findings",
"Confirm changes are scoped and queue remediation verification.",
),
BaselineEventKind::RemediationVerify => (
AdvancedEventClass::RemediationSafety,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::RecoveryPlanning,
],
"Post-remediation verification assessed health deltas and invariants",
"Evaluate health delta and close or reopen remediation loops.",
),
BaselineEventKind::ReplayComplete => (
AdvancedEventClass::ReplayDeterminism,
vec![
TroubleshootingDimension::Determinism,
TroubleshootingDimension::RuntimeInvariant,
],
"Replay completion captured deterministic scenario convergence status",
"Compare replay artifacts against baseline and investigate divergence.",
),
BaselineEventKind::ReplayStart => (
AdvancedEventClass::ReplayDeterminism,
vec![TroubleshootingDimension::Determinism],
"Replay start established deterministic execution context",
"Track replay progress and preserve trace/evidence join keys.",
),
BaselineEventKind::VerificationSummary => (
AdvancedEventClass::VerificationGovernance,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::Determinism,
TroubleshootingDimension::RuntimeInvariant,
],
"Verification summary synthesized gate outcomes for governance review",
"Use summary to decide promotion, rollback, or targeted investigation.",
),
}
}
#[cfg(test)]
#[allow(clippy::arc_with_non_send_sync)]
mod tests {
use super::*;
use crate::observability::spectral_health::HealthClassification;
use crate::record::obligation::{ObligationKind, ObligationRecord};
use crate::record::region::RegionRecord;
use crate::record::task::{TaskRecord, TaskState};
use crate::time::{TimerDriverHandle, VirtualClock};
use crate::types::{Budget, CancelReason, Outcome};
use crate::util::ArenaIndex;
use serde_json::{Value, json};
use std::fmt::Write as _;
use std::sync::Arc;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn insert_child_region(state: &mut RuntimeState, parent: RegionId) -> RegionId {
let idx = state.regions.insert(RegionRecord::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
Some(parent),
Budget::INFINITE,
));
let id = RegionId::from_arena(idx);
let record = state.regions.get_mut(idx).expect("child region missing");
record.id = id;
let added = state
.regions
.get(parent.arena_index())
.expect("parent missing")
.add_child(id);
crate::assert_with_log!(added.is_ok(), "child added", true, added.is_ok());
id
}
fn insert_task(state: &mut RuntimeState, region: RegionId, task_state: TaskState) -> TaskId {
let idx = state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
region,
Budget::INFINITE,
));
let id = TaskId::from_arena(idx);
let record = state.task_mut(id).expect("task missing");
record.id = id;
record.state = task_state;
let added = state
.regions
.get(region.arena_index())
.expect("region missing")
.add_task(id);
crate::assert_with_log!(added.is_ok(), "task added", true, added.is_ok());
id
}
fn insert_obligation(
state: &mut RuntimeState,
region: RegionId,
holder: TaskId,
kind: ObligationKind,
reserved_at: Time,
) -> ObligationId {
let idx = state.obligations.insert(ObligationRecord::new(
ObligationId::from_arena(ArenaIndex::new(0, 0)),
kind,
holder,
region,
reserved_at,
));
let id = ObligationId::from_arena(idx);
let record = state.obligations.get_mut(idx).expect("obligation missing");
record.id = id;
id
}
fn render_structured_diagnostic_report(
scenario: &str,
generated_at: &str,
region: Option<&RegionOpenExplanation>,
task: Option<&TaskBlockedExplanation>,
leaks: &[ObligationLeak],
) -> String {
let mut rendered = String::new();
writeln!(&mut rendered, "scenario: {scenario}").expect("write scenario");
writeln!(&mut rendered, "generated_at: {generated_at}").expect("write timestamp");
rendered.push_str("\n[region]\n");
if let Some(region) = region {
rendered.push_str(®ion.to_string());
} else {
rendered.push_str("none\n");
}
rendered.push_str("\n[task]\n");
if let Some(task) = task {
rendered.push_str(&task.to_string());
} else {
rendered.push_str("none\n");
}
rendered.push_str("\n[leaks]\n");
if leaks.is_empty() {
rendered.push_str("none\n");
} else {
for leak in leaks {
writeln!(
&mut rendered,
"- {:?} region={:?} holder={:?} type={} age_ms={}",
leak.obligation_id,
leak.region_id,
leak.holder_task,
leak.obligation_type,
leak.age.as_millis()
)
.expect("write leak");
}
}
rendered.trim_end().to_string()
}
fn render_structured_diagnostic_report_v2(sections: &[(&str, &str)]) -> String {
let mut rendered = String::from("report_version: v2");
for (label, section) in sections {
writeln!(&mut rendered, "\n\n[{label}]").expect("write section label");
rendered.push_str(section);
}
rendered.trim_end().to_string()
}
struct DiagnosticResourceAccounting {
total_regions: usize,
open_regions: usize,
total_tasks: usize,
live_tasks: usize,
total_obligations: usize,
leaked_obligations: usize,
}
struct DiagnosticReportV3Section<'a> {
label: &'a str,
status: &'a str,
accounting: DiagnosticResourceAccounting,
rendered: &'a str,
}
#[derive(Debug, Clone)]
struct DiagnosticMetricHistogram {
name: String,
buckets: Vec<(String, u64)>, total_count: u64,
percentiles: Vec<(f64, f64)>, }
struct DiagnosticReportV4Section<'a> {
label: &'a str,
status: &'a str,
accounting: DiagnosticResourceAccounting,
histograms: Vec<DiagnosticMetricHistogram>,
rendered: &'a str,
}
fn diagnostic_resource_accounting(
diagnostics: &Diagnostics,
leaked_obligations: usize,
) -> DiagnosticResourceAccounting {
let total_regions = diagnostics.state.regions.iter().count();
let open_regions = diagnostics
.state
.regions
.iter()
.filter(|(_, region)| region.state() != RegionState::Closed)
.count();
let total_tasks = diagnostics.state.tasks_iter().count();
let live_tasks = diagnostics
.state
.tasks_iter()
.filter(|(_, task)| !task.state.is_terminal())
.count();
let total_obligations = diagnostics.state.obligations.iter().count();
DiagnosticResourceAccounting {
total_regions,
open_regions,
total_tasks,
live_tasks,
total_obligations,
leaked_obligations,
}
}
fn render_structured_diagnostic_report_v3(
sections: &[DiagnosticReportV3Section<'_>],
) -> String {
let mut rendered = String::from("report_version: v3");
let passing_count = sections
.iter()
.filter(|section| section.status == "passing")
.count();
let degraded_count = sections
.iter()
.filter(|section| section.status == "degraded")
.count();
let critical_count = sections
.iter()
.filter(|section| section.status == "critical")
.count();
writeln!(&mut rendered, "\n\n[summary]").expect("write summary label");
writeln!(&mut rendered, "scenario_count: {}", sections.len())
.expect("write scenario count");
writeln!(&mut rendered, "passing_count: {passing_count}").expect("write passing count");
writeln!(&mut rendered, "degraded_count: {degraded_count}").expect("write degraded count");
writeln!(&mut rendered, "critical_count: {critical_count}").expect("write critical count");
for section in sections {
writeln!(&mut rendered, "\n\n[{}]", section.label).expect("write section label");
writeln!(&mut rendered, "status: {}", section.status).expect("write status");
rendered.push_str("resource_accounting:\n");
writeln!(
&mut rendered,
" - regions_total: {}",
section.accounting.total_regions
)
.expect("write total regions");
writeln!(
&mut rendered,
" - regions_open: {}",
section.accounting.open_regions
)
.expect("write open regions");
writeln!(
&mut rendered,
" - tasks_total: {}",
section.accounting.total_tasks
)
.expect("write total tasks");
writeln!(
&mut rendered,
" - tasks_live: {}",
section.accounting.live_tasks
)
.expect("write live tasks");
writeln!(
&mut rendered,
" - obligations_total: {}",
section.accounting.total_obligations
)
.expect("write total obligations");
writeln!(
&mut rendered,
" - obligations_leaked: {}",
section.accounting.leaked_obligations
)
.expect("write leaked obligations");
rendered.push_str("report:\n");
for line in section.rendered.lines() {
writeln!(&mut rendered, " {line}").expect("write report line");
}
}
rendered.trim_end().to_string()
}
fn render_structured_diagnostic_report_v4(
sections: &[DiagnosticReportV4Section<'_>],
) -> String {
let mut rendered = String::from("report_version: v4");
let passing_count = sections
.iter()
.filter(|section| section.status == "passing")
.count();
let degraded_count = sections
.iter()
.filter(|section| section.status == "degraded")
.count();
let critical_count = sections
.iter()
.filter(|section| section.status == "critical")
.count();
writeln!(&mut rendered, "\n\n[summary]").expect("write summary label");
writeln!(&mut rendered, "scenario_count: {}", sections.len())
.expect("write scenario count");
writeln!(&mut rendered, "passing_count: {passing_count}").expect("write passing count");
writeln!(&mut rendered, "degraded_count: {degraded_count}").expect("write degraded count");
writeln!(&mut rendered, "critical_count: {critical_count}").expect("write critical count");
let total_histogram_count: u64 = sections
.iter()
.flat_map(|section| §ion.histograms)
.map(|h| h.total_count)
.sum();
writeln!(
&mut rendered,
"total_histogram_samples: {total_histogram_count}"
)
.expect("write total histogram samples");
let histogram_types: std::collections::BTreeSet<&String> = sections
.iter()
.flat_map(|section| §ion.histograms)
.map(|h| &h.name)
.collect();
writeln!(
&mut rendered,
"histogram_types: [{}]",
histogram_types
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(", ")
)
.expect("write histogram types");
for section in sections {
writeln!(&mut rendered, "\n\n[{}]", section.label).expect("write section label");
writeln!(&mut rendered, "status: {}", section.status).expect("write status");
rendered.push_str("resource_accounting:\n");
writeln!(
&mut rendered,
" - regions_total: {}",
section.accounting.total_regions
)
.expect("write total regions");
writeln!(
&mut rendered,
" - regions_open: {}",
section.accounting.open_regions
)
.expect("write open regions");
writeln!(
&mut rendered,
" - tasks_total: {}",
section.accounting.total_tasks
)
.expect("write total tasks");
writeln!(
&mut rendered,
" - tasks_live: {}",
section.accounting.live_tasks
)
.expect("write live tasks");
writeln!(
&mut rendered,
" - obligations_total: {}",
section.accounting.total_obligations
)
.expect("write total obligations");
writeln!(
&mut rendered,
" - obligations_leaked: {}",
section.accounting.leaked_obligations
)
.expect("write leaked obligations");
if !section.histograms.is_empty() {
rendered.push_str("extended_metrics:\n");
for histogram in §ion.histograms {
writeln!(&mut rendered, " - name: {}", histogram.name)
.expect("write histogram name");
writeln!(
&mut rendered,
" total_samples: {}",
histogram.total_count
)
.expect("write histogram total");
if !histogram.buckets.is_empty() {
rendered.push_str(" buckets:\n");
for (bucket_name, count) in &histogram.buckets {
writeln!(&mut rendered, " - {}: {}", bucket_name, count)
.expect("write bucket");
}
}
if !histogram.percentiles.is_empty() {
rendered.push_str(" percentiles:\n");
for (percentile, value) in &histogram.percentiles {
writeln!(
&mut rendered,
" - p{:.1}: {:.3}",
percentile * 100.0,
value
)
.expect("write percentile");
}
}
}
}
rendered.push_str("report:\n");
for line in section.rendered.lines() {
writeln!(&mut rendered, " {line}").expect("write report line");
}
}
rendered.trim_end().to_string()
}
fn scrub_diagnostic_report_timestamps(rendered: &str) -> String {
rendered
.lines()
.map(|line| {
let trimmed = line.trim_start();
let detail = trimmed.strip_prefix("- ").unwrap_or(trimmed);
if line.starts_with("generated_at: ") {
"generated_at: <scrubbed>".to_string()
} else if detail.starts_with("next_retry_at: ") {
" - next_retry_at: <scrubbed>".to_string()
} else if detail.starts_with("observed_at: ") {
" - observed_at: <scrubbed>".to_string()
} else if detail.starts_with("deadline_at: ") {
" - deadline_at: <scrubbed>".to_string()
} else {
line.to_string()
}
})
.collect::<Vec<_>>()
.join("\n")
}
fn assert_diagnostic_report_snapshot(snapshot_name: &str, rendered: &str) {
insta::with_settings!({
snapshot_path => "../../tests/snapshots",
prepend_module_to_snapshot => false,
}, {
insta::assert_snapshot!(snapshot_name, rendered);
});
}
fn insert_wait_path(state: &mut RuntimeState, region: RegionId, len: usize) -> Vec<TaskId> {
let tasks: Vec<TaskId> = (0..len)
.map(|_| insert_task(state, region, TaskState::Running))
.collect();
for pair in tasks.windows(2) {
state
.task_mut(pair[0])
.expect("path task missing")
.waiters
.push(pair[1]);
}
tasks
}
fn round_metric(value: f64) -> f64 {
(value * 1_000.0).round() / 1_000.0
}
fn deadlock_severity_label(severity: DeadlockSeverity) -> &'static str {
match severity {
DeadlockSeverity::None => "none",
DeadlockSeverity::Elevated => "elevated",
DeadlockSeverity::Critical => "critical",
}
}
fn health_classification_json(classification: &HealthClassification) -> Value {
match classification {
HealthClassification::Deadlocked => json!({
"kind": "deadlocked",
}),
HealthClassification::Healthy { margin } => json!({
"kind": "healthy",
"margin": round_metric(*margin),
}),
HealthClassification::Degraded {
fiedler,
bottleneck_nodes,
} => json!({
"kind": "degraded",
"fiedler": round_metric(*fiedler),
"bottleneck_nodes": bottleneck_nodes,
}),
HealthClassification::Critical {
fiedler,
approaching_disconnect,
} => json!({
"kind": "critical",
"fiedler": round_metric(*fiedler),
"approaching_disconnect": approaching_disconnect,
}),
HealthClassification::Fragmented { components } => json!({
"kind": "fragmented",
"components": components,
}),
}
}
fn overall_health_status(
health: &SpectralHealthReport,
deadlock: &DirectionalDeadlockReport,
leak_count: usize,
) -> &'static str {
if deadlock.severity == DeadlockSeverity::Critical
|| matches!(
health.classification,
HealthClassification::Deadlocked
| HealthClassification::Critical { .. }
| HealthClassification::Fragmented { .. }
)
{
"critical"
} else if leak_count > 0
|| deadlock.severity == DeadlockSeverity::Elevated
|| matches!(health.classification, HealthClassification::Degraded { .. })
{
"degraded"
} else {
"passing"
}
}
fn render_diagnostic_healthcheck_json(
diagnostics: &Diagnostics,
region_id: RegionId,
generated_at: &str,
pid: u32,
) -> Value {
let health = diagnostics.analyze_structural_health();
let deadlock = diagnostics.analyze_directional_deadlock();
let region = diagnostics.explain_region_open(region_id);
let leaks = diagnostics.find_leaked_obligations();
let leak_count = leaks.len();
let max_age_ms = leaks
.iter()
.map(|leak| u64::try_from(leak.age.as_millis()).unwrap_or(u64::MAX))
.max()
.unwrap_or(0);
json!({
"generated_at": generated_at,
"pid": pid,
"status": overall_health_status(&health, &deadlock, leak_count),
"structural_health": {
"classification": health_classification_json(&health.classification),
"fiedler_value": round_metric(health.decomposition.fiedler_value),
"spectral_gap": round_metric(health.decomposition.spectral_gap),
"spectral_radius": round_metric(health.decomposition.spectral_radius),
"iterations_used": health.decomposition.iterations_used,
"bottleneck_count": health.bottlenecks.len(),
},
"directional_deadlock": {
"severity": deadlock_severity_label(deadlock.severity),
"risk_score": round_metric(deadlock.risk_score),
"cycle_count": deadlock.cycles.len(),
"trapped_cycle_count": deadlock.cycles.iter().filter(|cycle| cycle.trapped).count(),
"cycles": deadlock.cycles.iter().map(|cycle| json!({
"tasks": cycle.tasks.iter().map(|task| format!("{task:?}")).collect::<Vec<_>>(),
"trapped": cycle.trapped,
"ingress_edges": cycle.ingress_edges,
"egress_edges": cycle.egress_edges,
})).collect::<Vec<_>>(),
},
"region": {
"id": format!("{:?}", region.region_id),
"state": region.region_state.map(|state| format!("{state:?}")),
"reason_count": region.reasons.len(),
"recommendation_count": region.recommendations.len(),
},
"obligations": {
"leak_count": leak_count,
"max_age_ms": max_age_ms,
"leaks": leaks.iter().map(|leak| json!({
"obligation_id": format!("{:?}", leak.obligation_id),
"region_id": format!("{:?}", leak.region_id),
"holder_task": leak.holder_task.map(|task| format!("{task:?}")),
"obligation_type": &leak.obligation_type,
"age_ms": u64::try_from(leak.age.as_millis()).unwrap_or(u64::MAX),
})).collect::<Vec<_>>(),
},
})
}
fn scrub_diagnostic_healthcheck_json(mut value: Value) -> Value {
let Some(object) = value.as_object_mut() else {
return value;
};
object.insert(
"generated_at".to_string(),
Value::String("<scrubbed>".to_string()),
);
object.insert("pid".to_string(), Value::String("<scrubbed>".to_string()));
value
}
fn assert_diagnostic_healthcheck_snapshot(snapshot_name: &str, value: &Value) {
insta::with_settings!({
snapshot_path => "../../tests/snapshots",
prepend_module_to_snapshot => false,
}, {
insta::assert_json_snapshot!(snapshot_name, value);
});
}
#[test]
fn test_explain_region_open_unknown_region_returns_reason() {
init_test("test_explain_region_open_unknown_region_returns_reason");
let state = Arc::new(RuntimeState::new());
let diagnostics = Diagnostics::new(state);
let missing = RegionId::new_for_test(99, 0);
let explanation = diagnostics.explain_region_open(missing);
crate::assert_with_log!(
explanation.region_state.is_none(),
"region_state none",
true,
explanation.region_state.is_none()
);
crate::assert_with_log!(
explanation.reasons.len() == 1,
"single reason",
1usize,
explanation.reasons.len()
);
let is_not_found = matches!(explanation.reasons.first(), Some(Reason::RegionNotFound));
crate::assert_with_log!(is_not_found, "region not found reason", true, is_not_found);
let has_recommendation = explanation
.recommendations
.iter()
.any(|rec| rec.contains("Verify region id"));
crate::assert_with_log!(
has_recommendation,
"recommendation present",
true,
has_recommendation
);
crate::test_complete!("test_explain_region_open_unknown_region_returns_reason");
}
#[test]
fn test_explain_region_open_closed_region_has_no_reasons() {
init_test("test_explain_region_open_closed_region_has_no_reasons");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let region = state.region(root).expect("root missing");
let did_close =
region.begin_close(None) && region.begin_finalize() && region.complete_close();
crate::assert_with_log!(did_close, "region closed", true, did_close);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation.region_state == Some(RegionState::Closed),
"closed state",
true,
explanation.region_state == Some(RegionState::Closed)
);
crate::assert_with_log!(
explanation.reasons.is_empty(),
"no reasons",
true,
explanation.reasons.is_empty()
);
crate::assert_with_log!(
explanation.recommendations.is_empty(),
"no recommendations",
true,
explanation.recommendations.is_empty()
);
crate::test_complete!("test_explain_region_open_closed_region_has_no_reasons");
}
#[test]
fn test_explain_region_open_reports_children_tasks_obligations() {
init_test("test_explain_region_open_reports_children_tasks_obligations");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let task_id = insert_task(&mut state, root, TaskState::Running);
let task = state.task_mut(task_id).expect("task missing");
task.total_polls = 7;
let obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::SendPermit,
Time::from_millis(10),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
let mut saw_child = false;
let mut saw_task = false;
let mut saw_obligation = false;
for reason in &explanation.reasons {
match reason {
Reason::ChildRegionOpen { child_id, .. } if *child_id == child => {
saw_child = true;
}
Reason::TaskRunning {
task_id: id,
poll_count,
..
} if *id == task_id && *poll_count == 7 => {
saw_task = true;
}
Reason::ObligationHeld {
obligation_id: id,
holder_task,
..
} if *id == obligation_id && *holder_task == task_id => {
saw_obligation = true;
}
_ => {}
}
}
crate::assert_with_log!(saw_child, "child reason", true, saw_child);
crate::assert_with_log!(saw_task, "task reason", true, saw_task);
crate::assert_with_log!(saw_obligation, "obligation reason", true, saw_obligation);
let recs = &explanation.recommendations;
let has_child_rec = recs.iter().any(|r| r.contains("child regions"));
let has_task_rec = recs.iter().any(|r| r.contains("live tasks"));
let has_obligation_rec = recs.iter().any(|r| r.contains("obligations"));
crate::assert_with_log!(has_child_rec, "child rec", true, has_child_rec);
crate::assert_with_log!(has_task_rec, "task rec", true, has_task_rec);
crate::assert_with_log!(
has_obligation_rec,
"obligation rec",
true,
has_obligation_rec
);
let rendered = explanation.to_string();
crate::assert_with_log!(
rendered.contains("child region"),
"display includes child",
true,
rendered.contains("child region")
);
crate::assert_with_log!(
rendered.contains("obligation"),
"display includes obligation",
true,
rendered.contains("obligation")
);
crate::test_complete!("test_explain_region_open_reports_children_tasks_obligations");
}
#[test]
fn test_explain_region_open_nested_child_reports_immediate_child() {
init_test("test_explain_region_open_nested_child_reports_immediate_child");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let grandchild = insert_child_region(&mut state, child);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(child);
let saw_grandchild = explanation.reasons.iter().any(|reason| {
matches!(
reason,
Reason::ChildRegionOpen { child_id, .. } if *child_id == grandchild
)
});
crate::assert_with_log!(saw_grandchild, "grandchild reason", true, saw_grandchild);
crate::test_complete!("test_explain_region_open_nested_child_reports_immediate_child");
}
#[test]
fn test_explain_task_blocked_running_notified_reports_schedule() {
init_test("test_explain_task_blocked_running_notified_reports_schedule");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let task = state.task_mut(task_id).expect("task missing");
let notified = task.wake_state.notify();
crate::assert_with_log!(notified, "wake notified", true, notified);
task.waiters.push(TaskId::new_for_test(77, 0));
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
matches!(explanation.block_reason, BlockReason::AwaitingSchedule),
"awaiting schedule",
true,
matches!(explanation.block_reason, BlockReason::AwaitingSchedule)
);
let has_waiters = explanation.details.iter().any(|d| d.contains("waiters"));
crate::assert_with_log!(has_waiters, "waiters detail", true, has_waiters);
crate::test_complete!("test_explain_task_blocked_running_notified_reports_schedule");
}
#[test]
fn test_explain_task_blocked_cancel_requested_includes_reason() {
init_test("test_explain_task_blocked_cancel_requested_includes_reason");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let reason = CancelReason::user("stop");
let cleanup_budget = reason.cleanup_budget();
let task_id = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason,
cleanup_budget,
},
);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
let matches_reason = matches!(
explanation.block_reason,
BlockReason::CancelRequested {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: Some(_)
}
}
);
crate::assert_with_log!(matches_reason, "cancel requested", true, matches_reason);
let rendered = explanation.to_string();
crate::assert_with_log!(
rendered.contains("cancel requested"),
"display includes cancel",
true,
rendered.contains("cancel requested")
);
crate::test_complete!("test_explain_task_blocked_cancel_requested_includes_reason");
}
#[test]
fn test_explain_task_blocked_completed_reports_completed() {
init_test("test_explain_task_blocked_completed_reports_completed");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
matches!(explanation.block_reason, BlockReason::Completed),
"completed",
true,
matches!(explanation.block_reason, BlockReason::Completed)
);
crate::test_complete!("test_explain_task_blocked_completed_reports_completed");
}
#[test]
fn test_find_leaked_obligations_sorted_and_aged() {
init_test("test_find_leaked_obligations_sorted_and_aged");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(100)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let root_task = insert_task(&mut state, root, TaskState::Running);
let child_task = insert_task(&mut state, child, TaskState::Running);
let root_ob = insert_obligation(
&mut state,
root,
root_task,
ObligationKind::Ack,
Time::from_millis(10),
);
let child_ob = insert_obligation(
&mut state,
child,
child_task,
ObligationKind::Lease,
Time::from_millis(20),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 2, "two leaks", 2usize, leaks.len());
crate::assert_with_log!(
leaks[0].region_id == root,
"root first",
true,
leaks[0].region_id == root
);
crate::assert_with_log!(
leaks[1].region_id == child,
"child second",
true,
leaks[1].region_id == child
);
crate::assert_with_log!(
leaks[0].obligation_id == root_ob,
"root obligation id",
true,
leaks[0].obligation_id == root_ob
);
crate::assert_with_log!(
leaks[1].obligation_id == child_ob,
"child obligation id",
true,
leaks[1].obligation_id == child_ob
);
let root_age_ms = leaks[0].age.as_millis();
let child_age_ms = leaks[1].age.as_millis();
crate::assert_with_log!(root_age_ms == 90, "root age", 90u128, root_age_ms);
crate::assert_with_log!(child_age_ms == 80, "child age", 80u128, child_age_ms);
crate::test_complete!("test_find_leaked_obligations_sorted_and_aged");
}
#[test]
fn test_find_leaked_obligations_uses_state_clock_without_timer_driver() {
init_test("test_find_leaked_obligations_uses_state_clock_without_timer_driver");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
state.now = Time::from_millis(250);
let obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Lease,
Time::from_millis(10),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 1, "single leak", 1usize, leaks.len());
crate::assert_with_log!(
leaks[0].obligation_id == obligation_id,
"obligation id preserved",
true,
leaks[0].obligation_id == obligation_id
);
let age_ms = leaks[0].age.as_millis();
crate::assert_with_log!(age_ms == 240, "age uses state clock", 240u128, age_ms);
crate::test_complete!("test_find_leaked_obligations_uses_state_clock_without_timer_driver");
}
#[test]
fn reason_debug_clone() {
let r = Reason::RegionNotFound;
let r2 = r;
assert!(format!("{r2:?}").contains("RegionNotFound"));
}
#[test]
fn reason_display_all_variants() {
let r1 = Reason::RegionNotFound;
assert!(r1.to_string().contains("not found"));
let r2 = Reason::ChildRegionOpen {
child_id: RegionId::new_for_test(1, 0),
child_state: RegionState::Open,
};
assert!(r2.to_string().contains("child region"));
let r3 = Reason::TaskRunning {
task_id: TaskId::new_for_test(1, 0),
task_state: "Running".into(),
poll_count: 5,
};
assert!(r3.to_string().contains("task"));
assert!(r3.to_string().contains("polls=5"));
let r4 = Reason::ObligationHeld {
obligation_id: ObligationId::new_for_test(1, 0),
obligation_type: "Lease".into(),
holder_task: TaskId::new_for_test(2, 0),
};
assert!(r4.to_string().contains("obligation"));
assert!(r4.to_string().contains("Lease"));
}
#[test]
fn region_open_explanation_debug_clone() {
let explanation = RegionOpenExplanation {
region_id: RegionId::new_for_test(1, 0),
region_state: Some(RegionState::Open),
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["check it".into()],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("RegionOpenExplanation"));
}
#[test]
fn region_open_explanation_display() {
let explanation = RegionOpenExplanation {
region_id: RegionId::new_for_test(1, 0),
region_state: Some(RegionState::Open),
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["fix it".into()],
};
let s = explanation.to_string();
assert!(s.contains("still open"));
assert!(s.contains("fix it"));
}
#[test]
fn task_blocked_explanation_debug_clone() {
let explanation = TaskBlockedExplanation {
task_id: TaskId::new_for_test(1, 0),
block_reason: BlockReason::NotStarted,
details: vec!["detail".into()],
recommendations: vec!["wait".into()],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("TaskBlockedExplanation"));
}
#[test]
fn task_blocked_explanation_display() {
let explanation = TaskBlockedExplanation {
task_id: TaskId::new_for_test(1, 0),
block_reason: BlockReason::AwaitingSchedule,
details: vec!["pending wake".into()],
recommendations: vec!["wait for scheduler".into()],
};
let s = explanation.to_string();
assert!(s.contains("blocked"));
assert!(s.contains("awaiting schedule"));
}
#[test]
fn block_reason_debug_clone() {
let r = BlockReason::TaskNotFound;
let r2 = r;
assert!(format!("{r2:?}").contains("TaskNotFound"));
}
#[test]
fn block_reason_display_all_variants() {
let variants: Vec<BlockReason> = vec![
BlockReason::TaskNotFound,
BlockReason::NotStarted,
BlockReason::AwaitingSchedule,
BlockReason::AwaitingFuture {
description: "channel recv".into(),
},
BlockReason::CancelRequested {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: Some("stop".into()),
},
},
BlockReason::RunningCleanup {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: None,
},
polls_remaining: 10,
},
BlockReason::Finalizing {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: None,
},
polls_remaining: 5,
},
BlockReason::Completed,
];
for v in &variants {
assert!(!v.to_string().is_empty());
}
}
#[test]
fn cancellation_explanation_debug_clone() {
let explanation = CancellationExplanation {
kind: CancelKind::User,
message: Some("timeout".into()),
propagation_path: vec![CancellationStep {
region_id: RegionId::new_for_test(1, 0),
kind: CancelKind::User,
}],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("CancellationExplanation"));
}
#[test]
fn cancellation_step_debug_clone() {
let step = CancellationStep {
region_id: RegionId::new_for_test(1, 0),
kind: CancelKind::User,
};
let step2 = step;
assert!(format!("{step2:?}").contains("CancellationStep"));
}
#[test]
fn cancel_reason_info_debug_clone_display() {
let info = CancelReasonInfo {
kind: CancelKind::User,
message: Some("stop".into()),
};
let info2 = info.clone();
assert!(format!("{info2:?}").contains("CancelReasonInfo"));
let s = info.to_string();
assert!(s.contains("stop"));
let info_no_msg = CancelReasonInfo {
kind: CancelKind::User,
message: None,
};
assert!(!info_no_msg.to_string().is_empty());
}
#[test]
fn obligation_leak_debug_clone() {
let leak = ObligationLeak {
obligation_id: ObligationId::new_for_test(1, 0),
obligation_type: "Ack".into(),
holder_task: Some(TaskId::new_for_test(2, 0)),
region_id: RegionId::new_for_test(1, 0),
age: std::time::Duration::from_secs(60),
};
let leak2 = leak;
assert!(format!("{leak2:?}").contains("ObligationLeak"));
}
#[test]
fn directional_deadlock_cycle_detection_reports_critical() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2); state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let report = diagnostics.analyze_directional_deadlock();
assert_eq!(report.severity, DeadlockSeverity::Critical);
assert!(!report.cycles.is_empty());
assert!(report.cycles[0].trapped);
assert!(report.cycles[0].tasks.contains(&t1));
assert!(report.cycles[0].tasks.contains(&t2));
}
#[test]
fn structural_health_reports_deadlocked_for_trapped_cycle() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2);
state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let report = diagnostics.analyze_structural_health();
assert!(matches!(
report.classification,
crate::observability::spectral_health::HealthClassification::Deadlocked
));
}
#[test]
fn explain_region_open_includes_directional_deadlock_recommendation() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2);
state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
assert!(
explanation
.recommendations
.iter()
.any(|r| r.contains("Directional deadlock risk")),
"expected directional deadlock recommendation"
);
}
#[test]
fn advanced_observability_contract_has_sorted_dimensions_and_classes() {
let contract = advanced_observability_contract();
let classes: Vec<&str> = contract
.event_classes
.iter()
.map(|item| item.class_id.as_str())
.collect();
let mut sorted_classes = classes.clone();
sorted_classes.sort_unstable();
sorted_classes.dedup();
assert_eq!(classes, sorted_classes);
let dimensions: Vec<&str> = contract
.troubleshooting_dimensions
.iter()
.map(|item| item.dimension.as_str())
.collect();
let mut sorted_dimensions = dimensions.clone();
sorted_dimensions.sort_unstable();
sorted_dimensions.dedup();
assert_eq!(dimensions, sorted_dimensions);
}
#[test]
fn classify_baseline_log_event_maps_known_event() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "execution",
event_kind: "command_start",
outcome_class: "success",
})
.expect("classification should succeed");
assert_eq!(classified.event_class, AdvancedEventClass::CommandLifecycle);
assert_eq!(classified.severity, AdvancedSeverity::Info);
assert!(classified.conflicts.is_empty());
assert!(
classified
.dimensions
.contains(&TroubleshootingDimension::OperatorAction)
);
}
#[test]
fn classify_baseline_log_event_detects_flow_event_conflict() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "execution",
event_kind: "integration_sync",
outcome_class: "success",
})
.expect("classification should succeed with conflict");
assert_eq!(classified.severity, AdvancedSeverity::Critical);
assert!(classified.conflicts.iter().any(|conflict| matches!(
conflict,
AdvancedClassificationConflict::FlowEventMismatch { .. }
)));
}
#[test]
fn classify_baseline_log_event_detects_outcome_event_conflict() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "integration",
event_kind: "integration_error",
outcome_class: "success",
})
.expect("classification should succeed with conflict");
assert_eq!(
classified.event_class,
AdvancedEventClass::IntegrationReliability
);
assert_eq!(classified.severity, AdvancedSeverity::Error);
assert!(classified.conflicts.iter().any(|conflict| matches!(
conflict,
AdvancedClassificationConflict::OutcomeEventMismatch { .. }
)));
}
#[test]
fn classify_baseline_log_events_is_deterministic() {
let stream = vec![
BaselineLogEvent {
flow_id: "execution",
event_kind: "command_start",
outcome_class: "success",
},
BaselineLogEvent {
flow_id: "execution",
event_kind: "verification_summary",
outcome_class: "failed",
},
BaselineLogEvent {
flow_id: "replay",
event_kind: "replay_complete",
outcome_class: "cancelled",
},
];
let a = classify_baseline_log_events(&stream).expect("stream classification should pass");
let b = classify_baseline_log_events(&stream).expect("stream classification should pass");
assert_eq!(a, b);
assert!(!a.is_empty());
assert!(a.iter().all(|entry| !entry.narrative.is_empty()));
}
#[test]
fn classify_baseline_log_event_rejects_unknown_tokens() {
let err = classify_baseline_log_event(BaselineLogEvent {
flow_id: "unknown",
event_kind: "command_start",
outcome_class: "success",
})
.expect_err("unknown flow must be rejected");
assert!(err.contains("unknown flow_id"));
}
#[test]
fn structured_diagnostic_report_snapshot_happy_path() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let diagnostics = Diagnostics::new(Arc::new(state));
let rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-20T22:00:00Z",
None,
Some(&diagnostics.explain_task_blocked(task_id)),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_happy_path", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_v2_happy_and_degraded() {
let mut happy_state = RuntimeState::new();
let happy_root = happy_state.create_root_region(Budget::INFINITE);
let happy_task = insert_task(
&mut happy_state,
happy_root,
TaskState::Completed(Outcome::Ok(())),
);
let happy_diagnostics = Diagnostics::new(Arc::new(happy_state));
let happy_rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-20T22:00:00Z",
None,
Some(&happy_diagnostics.explain_task_blocked(happy_task)),
&[],
);
let happy_scrubbed = scrub_diagnostic_report_timestamps(&happy_rendered);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_child = insert_child_region(&mut degraded_state, degraded_root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
degraded_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let degraded_root_task =
insert_task(&mut degraded_state, degraded_root, TaskState::Running);
degraded_state
.task_mut(degraded_root_task)
.expect("root task missing")
.total_polls = 9;
let degraded_child_task = insert_task(
&mut degraded_state,
degraded_child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _degraded_root_obligation = insert_obligation(
&mut degraded_state,
degraded_root,
degraded_root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _degraded_child_obligation = insert_obligation(
&mut degraded_state,
degraded_child,
degraded_child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
let mut degraded_task = degraded_diagnostics.explain_task_blocked(degraded_child_task);
degraded_task
.details
.insert(0, "observed_at: 2026-04-20T22:00:05Z".to_string());
degraded_task
.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let degraded_leaks = degraded_diagnostics.find_leaked_obligations();
let degraded_rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-20T22:00:05Z",
Some(°raded_diagnostics.explain_region_open(degraded_root)),
Some(°raded_task),
°raded_leaks,
);
let degraded_scrubbed = scrub_diagnostic_report_timestamps(°raded_rendered);
let rendered = render_structured_diagnostic_report_v2(&[
("happy", &happy_scrubbed),
("degraded", °raded_scrubbed),
]);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v2",
&rendered,
);
}
#[test]
fn structured_diagnostic_report_snapshot_v3_happy_degraded_and_critical() {
let mut happy_state = RuntimeState::new();
let happy_root = happy_state.create_root_region(Budget::INFINITE);
let happy_task = insert_task(
&mut happy_state,
happy_root,
TaskState::Completed(Outcome::Ok(())),
);
let happy_diagnostics = Diagnostics::new(Arc::new(happy_state));
let happy_rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-21T09:10:00Z",
None,
Some(&happy_diagnostics.explain_task_blocked(happy_task)),
&[],
);
let happy_scrubbed = scrub_diagnostic_report_timestamps(&happy_rendered);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_child = insert_child_region(&mut degraded_state, degraded_root);
let degraded_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
degraded_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(
°raded_clock,
)));
let degraded_root_task =
insert_task(&mut degraded_state, degraded_root, TaskState::Running);
degraded_state
.task_mut(degraded_root_task)
.expect("degraded root task missing")
.total_polls = 9;
let degraded_child_task = insert_task(
&mut degraded_state,
degraded_child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _degraded_root_obligation = insert_obligation(
&mut degraded_state,
degraded_root,
degraded_root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _degraded_child_obligation = insert_obligation(
&mut degraded_state,
degraded_child,
degraded_child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
let mut degraded_task = degraded_diagnostics.explain_task_blocked(degraded_child_task);
degraded_task
.details
.insert(0, "observed_at: 2026-04-21T09:10:05Z".to_string());
degraded_task
.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let degraded_leaks = degraded_diagnostics.find_leaked_obligations();
let degraded_rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-21T09:10:05Z",
Some(°raded_diagnostics.explain_region_open(degraded_root)),
Some(°raded_task),
°raded_leaks,
);
let degraded_scrubbed = scrub_diagnostic_report_timestamps(°raded_rendered);
let mut critical_state = RuntimeState::new();
let critical_root = critical_state.create_root_region(Budget::INFINITE);
let critical_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
critical_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(
&critical_clock,
)));
let critical_t1 = insert_task(&mut critical_state, critical_root, TaskState::Running);
let critical_t2 = insert_task(&mut critical_state, critical_root, TaskState::Running);
critical_state
.task_mut(critical_t1)
.expect("critical task 1 missing")
.waiters
.push(critical_t2);
critical_state
.task_mut(critical_t2)
.expect("critical task 2 missing")
.waiters
.push(critical_t1);
let _critical_obligation = insert_obligation(
&mut critical_state,
critical_root,
critical_t1,
ObligationKind::Ack,
Time::from_millis(250),
);
let critical_diagnostics = Diagnostics::new(Arc::new(critical_state));
let mut critical_task = critical_diagnostics.explain_task_blocked(critical_t1);
critical_task
.details
.insert(0, "observed_at: 2026-04-21T09:10:09Z".to_string());
critical_task
.recommendations
.push("Break the trapped wait cycle before allowing retries.".to_string());
let critical_leaks = critical_diagnostics.find_leaked_obligations();
let critical_rendered = render_structured_diagnostic_report(
"critical_deadlock",
"2026-04-21T09:10:09Z",
Some(&critical_diagnostics.explain_region_open(critical_root)),
Some(&critical_task),
&critical_leaks,
);
let critical_scrubbed = scrub_diagnostic_report_timestamps(&critical_rendered);
let rendered = render_structured_diagnostic_report_v3(&[
DiagnosticReportV3Section {
label: "happy",
status: "passing",
accounting: diagnostic_resource_accounting(&happy_diagnostics, 0),
rendered: &happy_scrubbed,
},
DiagnosticReportV3Section {
label: "degraded",
status: "degraded",
accounting: diagnostic_resource_accounting(
°raded_diagnostics,
degraded_leaks.len(),
),
rendered: °raded_scrubbed,
},
DiagnosticReportV3Section {
label: "critical",
status: "critical",
accounting: diagnostic_resource_accounting(
&critical_diagnostics,
critical_leaks.len(),
),
rendered: &critical_scrubbed,
},
]);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v3",
&rendered,
);
}
#[test]
fn structured_diagnostic_report_snapshot_rate_limited() {
let task = TaskBlockedExplanation {
task_id: TaskId::new_for_test(11, 0),
block_reason: BlockReason::AwaitingFuture {
description: "rate-limited by token bucket".to_string(),
},
details: vec![
"next_retry_at: 2026-04-20T22:00:02Z".to_string(),
"queue_depth: 3".to_string(),
"limiter: outbound_http".to_string(),
],
recommendations: vec!["Wait for the limiter budget to replenish.".to_string()],
};
let rendered = render_structured_diagnostic_report(
"rate_limited",
"2026-04-20T22:00:01Z",
None,
Some(&task),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_rate_limited", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_oom() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::resource_unavailable().with_message("oom"),
cleanup_budget: Budget::new().with_poll_quota(64),
},
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(task_id);
task.details
.insert(0, "observed_at: 2026-04-20T22:00:03Z".to_string());
task.details.push("headroom: 0.00".to_string());
task.details.push("allocator_lane: region_heap".to_string());
task.recommendations
.push("Relieve memory pressure before retrying.".to_string());
let rendered = render_structured_diagnostic_report(
"oom",
"2026-04-20T22:00:03Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_oom", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_deadline_exceeded() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let task_id = insert_task(
&mut state,
root,
TaskState::Finalizing {
reason: CancelReason::deadline().with_message("deadline exceeded"),
cleanup_budget: Budget::new().with_poll_quota(42),
},
);
let _obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Lease,
Time::from_millis(250),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(task_id);
task.details
.insert(0, "deadline_at: 2026-04-20T22:00:04Z".to_string());
task.recommendations
.push("Inspect cleanup latency and tighten downstream budgets.".to_string());
let leaks = diagnostics.find_leaked_obligations();
let rendered = render_structured_diagnostic_report(
"deadline_exceeded",
"2026-04-20T22:00:04Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&leaks,
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_deadline_exceeded", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_shutdown_drain() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let root_task = insert_task(&mut state, root, TaskState::Running);
state
.task_mut(root_task)
.expect("root task missing")
.total_polls = 9;
let child_task = insert_task(
&mut state,
child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _root_obligation = insert_obligation(
&mut state,
root,
root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _child_obligation = insert_obligation(
&mut state,
child,
child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(child_task);
task.details
.insert(0, "observed_at: 2026-04-20T22:00:05Z".to_string());
task.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let leaks = diagnostics.find_leaked_obligations();
let rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-20T22:00:05Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&leaks,
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_shutdown_drain", &scrubbed);
}
#[test]
fn diagnostics_healthcheck_json_snapshot_scrubbed() {
init_test("diagnostics_healthcheck_json_snapshot_scrubbed");
let mut passing_state = RuntimeState::new();
let passing_root = passing_state.create_root_region(Budget::INFINITE);
let passing_region = passing_state
.region(passing_root)
.expect("passing root missing");
let did_close = passing_region.begin_close(None)
&& passing_region.begin_finalize()
&& passing_region.complete_close();
assert!(did_close, "passing root should close cleanly");
let passing = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
&Diagnostics::new(Arc::new(passing_state)),
passing_root,
"2026-04-21T08:30:00Z",
4101,
));
assert_eq!(
passing.get("status").and_then(Value::as_str),
Some("passing")
);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_tasks = insert_wait_path(&mut degraded_state, degraded_root, 11);
degraded_state
.task_mut(*degraded_tasks.first().expect("degraded path head"))
.expect("degraded head task missing")
.total_polls = 3;
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
assert!(
matches!(
degraded_diagnostics
.analyze_structural_health()
.classification,
HealthClassification::Degraded { .. }
),
"expected degraded classification for chained wait path"
);
let degraded = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
°raded_diagnostics,
degraded_root,
"2026-04-21T08:30:01Z",
4102,
));
assert_eq!(
degraded.get("status").and_then(Value::as_str),
Some("degraded")
);
let mut critical_state = RuntimeState::new();
let critical_root = critical_state.create_root_region(Budget::INFINITE);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
critical_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let t1 = insert_task(&mut critical_state, critical_root, TaskState::Running);
let t2 = insert_task(&mut critical_state, critical_root, TaskState::Running);
critical_state
.task_mut(t1)
.expect("critical t1")
.waiters
.push(t2);
critical_state
.task_mut(t2)
.expect("critical t2")
.waiters
.push(t1);
let _critical_obligation = insert_obligation(
&mut critical_state,
critical_root,
t1,
ObligationKind::Ack,
Time::from_millis(250),
);
let critical_diagnostics = Diagnostics::new(Arc::new(critical_state));
assert_eq!(
critical_diagnostics.analyze_directional_deadlock().severity,
DeadlockSeverity::Critical
);
let critical = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
&critical_diagnostics,
critical_root,
"2026-04-21T08:30:02Z",
4103,
));
assert_eq!(
critical.get("status").and_then(Value::as_str),
Some("critical")
);
assert_diagnostic_healthcheck_snapshot(
"observability_diagnostics_healthcheck_json",
&json!({
"passing": passing,
"degraded": degraded,
"critical": critical,
}),
);
}
#[test]
fn tail_latency_taxonomy_contract_has_unique_required_keys() {
let contract = tail_latency_taxonomy_contract();
let keys: Vec<&str> = contract
.required_log_fields
.iter()
.map(|field| field.key.as_str())
.collect();
let mut unique_keys = keys.clone();
unique_keys.sort_unstable();
unique_keys.dedup();
assert_eq!(keys.len(), unique_keys.len());
}
#[test]
fn tail_latency_taxonomy_contract_includes_unknown_bucket_and_signals() {
let contract = tail_latency_taxonomy_contract();
assert_eq!(
contract.contract_version,
TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION
);
assert_eq!(contract.unknown_bucket_key, "tail.unknown.unmeasured_ns");
assert!(
contract
.required_log_fields
.iter()
.any(|field| field.key == contract.unknown_bucket_key && field.required)
);
assert!(contract.terms.iter().any(|term| {
term.term_id == "unknown"
&& term.direct_duration_key == "tail.unknown.unmeasured_ns"
&& term
.signals
.iter()
.any(|signal| signal.structured_log_key == "tail.unknown.unmeasured_ns")
}));
}
#[test]
fn tail_latency_taxonomy_contract_core_signals_have_existing_files() {
let contract = tail_latency_taxonomy_contract();
let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
for signal in contract
.terms
.iter()
.flat_map(|term| term.signals.iter())
.filter(|signal| signal.core)
{
assert!(
repo_root.join(&signal.producer_file).exists(),
"producer file must exist: {}",
signal.producer_file
);
}
}
#[test]
fn diagnostics_debug() {
let state = Arc::new(RuntimeState::new());
let diagnostics = Diagnostics::new(state);
assert!(format!("{diagnostics:?}").contains("Diagnostics"));
}
#[test]
fn introspection_conf_001_diagnostic_query_result_idempotence() {
init_test("introspection_conf_001_diagnostic_query_result_idempotence");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation1 = diagnostics.explain_region_open(root);
let explanation2 = diagnostics.explain_region_open(root);
let explanation3 = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation1.region_id == explanation2.region_id,
"region_id idempotent",
true,
explanation1.region_id == explanation2.region_id
);
crate::assert_with_log!(
explanation2.region_id == explanation3.region_id,
"region_id triple check",
true,
explanation2.region_id == explanation3.region_id
);
crate::assert_with_log!(
explanation1.reasons.len() == explanation2.reasons.len(),
"reasons count idempotent",
explanation1.reasons.len(),
explanation2.reasons.len()
);
crate::assert_with_log!(
explanation2.reasons.len() == explanation3.reasons.len(),
"reasons count triple check",
explanation2.reasons.len(),
explanation3.reasons.len()
);
let task1 = diagnostics.explain_task_blocked(task_id);
let task2 = diagnostics.explain_task_blocked(task_id);
let task3 = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
task1.task_id == task2.task_id,
"task_id idempotent",
true,
task1.task_id == task2.task_id
);
crate::assert_with_log!(
task2.task_id == task3.task_id,
"task_id triple check",
true,
task2.task_id == task3.task_id
);
let leaks1 = diagnostics.find_leaked_obligations();
let leaks2 = diagnostics.find_leaked_obligations();
let leaks3 = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
leaks1.len() == leaks2.len(),
"leaks count idempotent",
leaks1.len(),
leaks2.len()
);
crate::assert_with_log!(
leaks2.len() == leaks3.len(),
"leaks count triple check",
leaks2.len(),
leaks3.len()
);
crate::test_complete!("introspection_conf_001_diagnostic_query_result_idempotence");
}
#[test]
fn introspection_conf_002_deterministic_ordering_of_diagnostic_results() {
init_test("introspection_conf_002_deterministic_ordering_of_diagnostic_results");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child1 = insert_child_region(&mut state, root);
let child2 = insert_child_region(&mut state, root);
let child3 = insert_child_region(&mut state, root);
let task1 = insert_task(&mut state, child1, TaskState::Running);
let task2 = insert_task(&mut state, child2, TaskState::Running);
let task3 = insert_task(&mut state, child3, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(100)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let _ob1 = insert_obligation(
&mut state,
child1,
task1,
ObligationKind::Ack,
Time::from_millis(10),
);
let _ob3 = insert_obligation(
&mut state,
child3,
task3,
ObligationKind::Lease,
Time::from_millis(30),
);
let _ob2 = insert_obligation(
&mut state,
child2,
task2,
ObligationKind::SendPermit,
Time::from_millis(20),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks_run1 = diagnostics.find_leaked_obligations();
let leaks_run2 = diagnostics.find_leaked_obligations();
let leaks_run3 = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
leaks_run1.len() == 3,
"leak count consistent",
3usize,
leaks_run1.len()
);
for i in 0..3 {
crate::assert_with_log!(
leaks_run1[i].obligation_id == leaks_run2[i].obligation_id,
&format!("obligation_id ordering run1==run2 idx {}", i),
true,
leaks_run1[i].obligation_id == leaks_run2[i].obligation_id
);
crate::assert_with_log!(
leaks_run2[i].obligation_id == leaks_run3[i].obligation_id,
&format!("obligation_id ordering run2==run3 idx {}", i),
true,
leaks_run2[i].obligation_id == leaks_run3[i].obligation_id
);
crate::assert_with_log!(
leaks_run1[i].region_id == leaks_run2[i].region_id,
&format!("region_id ordering run1==run2 idx {}", i),
true,
leaks_run1[i].region_id == leaks_run2[i].region_id
);
}
let explanation1 = diagnostics.explain_region_open(root);
let explanation2 = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation1.reasons.len() == explanation2.reasons.len(),
"reason count deterministic",
explanation1.reasons.len(),
explanation2.reasons.len()
);
for i in 0..explanation1.reasons.len().min(explanation2.reasons.len()) {
let reason1_desc = format!("{:?}", explanation1.reasons[i]);
let reason2_desc = format!("{:?}", explanation2.reasons[i]);
crate::assert_with_log!(
reason1_desc == reason2_desc,
&format!("reason ordering idx {}", i),
true,
reason1_desc == reason2_desc
);
}
crate::test_complete!(
"introspection_conf_002_deterministic_ordering_of_diagnostic_results"
);
}
#[test]
fn introspection_conf_003_serialization_roundtrip_correctness() {
init_test("introspection_conf_003_serialization_roundtrip_correctness");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(500)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Ack,
Time::from_millis(100),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
!leaks.is_empty(),
"has leaks for serialization test",
true,
!leaks.is_empty()
);
for leak in &leaks {
crate::assert_with_log!(
leak.obligation_id == ob_id,
"obligation_id preserved",
true,
leak.obligation_id == ob_id
);
crate::assert_with_log!(
!leak.obligation_type.is_empty(),
"obligation_type serializable",
true,
!leak.obligation_type.is_empty()
);
crate::assert_with_log!(
leak.holder_task == Some(task_id),
"holder_task preserved",
true,
leak.holder_task == Some(task_id)
);
crate::assert_with_log!(
leak.region_id == root,
"region_id preserved",
true,
leak.region_id == root
);
crate::assert_with_log!(
leak.age.as_millis() > 0,
"age computed and serializable",
true,
leak.age.as_millis() > 0
);
let debug_repr = format!("{:?}", leak);
crate::assert_with_log!(
debug_repr.contains("ObligationLeak"),
"debug serialization includes type",
true,
debug_repr.contains("ObligationLeak")
);
crate::assert_with_log!(
debug_repr.contains(&leak.obligation_type),
"debug serialization includes obligation_type",
true,
debug_repr.contains(&leak.obligation_type)
);
}
let explanation = diagnostics.explain_region_open(root);
let display_repr = format!("{}", explanation);
crate::assert_with_log!(
display_repr.contains("Region"),
"display contains region marker",
true,
display_repr.contains("Region")
);
crate::assert_with_log!(
!display_repr.is_empty(),
"display produces non-empty output",
true,
!display_repr.is_empty()
);
let task_explanation = diagnostics.explain_task_blocked(task_id);
let task_display = format!("{}", task_explanation);
crate::assert_with_log!(
task_display.contains("Task"),
"task display contains task marker",
true,
task_display.contains("Task")
);
crate::assert_with_log!(
!task_display.is_empty(),
"task display produces non-empty output",
true,
!task_display.is_empty()
);
crate::test_complete!("introspection_conf_003_serialization_roundtrip_correctness");
}
#[test]
fn introspection_conf_004_scheduler_state_accuracy_in_diagnostics() {
init_test("introspection_conf_004_scheduler_state_accuracy_in_diagnostics");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let running_task = insert_task(&mut state, root, TaskState::Running);
let completed_task = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let cancel_task = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::user("test"),
cleanup_budget: Budget::with_deadline_ns(100_000_000), },
);
let running_task_record = state.task_mut(running_task).expect("running task");
running_task_record.total_polls = 5;
running_task_record.last_polled_step = 100;
let notified = running_task_record.wake_state.notify();
crate::assert_with_log!(notified, "wake state notified", true, notified);
let diagnostics = Diagnostics::new(Arc::new(state));
let running_explanation = diagnostics.explain_task_blocked(running_task);
crate::assert_with_log!(
matches!(
running_explanation.block_reason,
BlockReason::AwaitingSchedule
),
"running task shows awaiting schedule",
true,
matches!(
running_explanation.block_reason,
BlockReason::AwaitingSchedule
)
);
let completed_explanation = diagnostics.explain_task_blocked(completed_task);
crate::assert_with_log!(
matches!(completed_explanation.block_reason, BlockReason::Completed),
"completed task shows completed",
true,
matches!(completed_explanation.block_reason, BlockReason::Completed)
);
let cancel_explanation = diagnostics.explain_task_blocked(cancel_task);
let is_cancel_requested = matches!(
cancel_explanation.block_reason,
BlockReason::CancelRequested { .. }
);
crate::assert_with_log!(
is_cancel_requested,
"cancel requested task shows cancel",
true,
is_cancel_requested
);
let region_explanation = diagnostics.explain_region_open(root);
let mut found_running = false;
let found_completed = false;
let mut found_cancel = false;
for reason in ®ion_explanation.reasons {
match reason {
Reason::TaskRunning {
task_id,
poll_count,
..
} if *task_id == running_task => {
found_running = true;
crate::assert_with_log!(
*poll_count == 5,
"poll count accuracy",
5u64,
*poll_count
);
}
Reason::TaskRunning { task_id, .. } if *task_id == cancel_task => {
found_cancel = true;
}
_ => {}
}
}
crate::assert_with_log!(
found_running,
"found running task reason",
true,
found_running
);
crate::assert_with_log!(found_cancel, "found cancel task reason", true, found_cancel);
crate::assert_with_log!(
!found_completed,
"completed task not blocking",
true,
!found_completed
);
crate::test_complete!("introspection_conf_004_scheduler_state_accuracy_in_diagnostics");
}
#[test]
fn introspection_conf_005_cross_method_consistency_of_diagnostic_data() {
init_test("introspection_conf_005_cross_method_consistency_of_diagnostic_data");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(300)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::SendPermit,
Time::from_millis(50),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let region_explanation = diagnostics.explain_region_open(root);
let task_explanation = diagnostics.explain_task_blocked(task_id);
let leaked_obligations = diagnostics.find_leaked_obligations();
let mut task_in_region_reasons = false;
for reason in ®ion_explanation.reasons {
if let Reason::TaskRunning { task_id: id, .. } = reason {
if *id == task_id {
task_in_region_reasons = true;
break;
}
}
}
crate::assert_with_log!(
task_in_region_reasons,
"task appears in region explanation",
true,
task_in_region_reasons
);
crate::assert_with_log!(
task_explanation.task_id == task_id,
"task explanation has correct ID",
true,
task_explanation.task_id == task_id
);
let mut obligation_in_region_reasons = false;
for reason in ®ion_explanation.reasons {
if let Reason::ObligationHeld {
obligation_id: id,
holder_task,
..
} = reason
{
if *id == ob_id && *holder_task == task_id {
obligation_in_region_reasons = true;
break;
}
}
}
crate::assert_with_log!(
obligation_in_region_reasons,
"obligation appears in region explanation",
true,
obligation_in_region_reasons
);
crate::assert_with_log!(
!leaked_obligations.is_empty(),
"obligation appears in leak detection",
true,
!leaked_obligations.is_empty()
);
let found_leak = leaked_obligations
.iter()
.any(|leak| leak.obligation_id == ob_id && leak.holder_task == Some(task_id));
crate::assert_with_log!(
found_leak,
"leak detection finds same obligation",
true,
found_leak
);
for leak in &leaked_obligations {
if leak.obligation_id == ob_id {
crate::assert_with_log!(
leak.age.as_millis() == 250, "age calculation consistent",
250u128,
leak.age.as_millis()
);
}
}
crate::test_complete!("introspection_conf_005_cross_method_consistency_of_diagnostic_data");
}
#[test]
fn introspection_conf_006_temporal_consistency_of_diagnostic_queries() {
init_test("introspection_conf_006_temporal_consistency_of_diagnostic_queries");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Ack,
Time::from_millis(100),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let initial_leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
initial_leaks.len() == 1,
"initial leak count",
1usize,
initial_leaks.len()
);
let initial_age = initial_leaks[0].age.as_millis();
crate::assert_with_log!(
initial_age == 900, "initial age calculation",
900u128,
initial_age
);
clock.advance_to(Time::from_millis(2000));
let later_leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
later_leaks.len() == 1,
"leak count preserved after clock advance",
1usize,
later_leaks.len()
);
let later_age = later_leaks[0].age.as_millis();
crate::assert_with_log!(
later_age == 1900, "updated age calculation",
1900u128,
later_age
);
crate::assert_with_log!(
later_leaks[0].obligation_id == ob_id,
"obligation ID preserved across time",
true,
later_leaks[0].obligation_id == ob_id
);
let region_explanation = diagnostics.explain_region_open(root);
let mut obligation_reason_found = false;
for reason in ®ion_explanation.reasons {
if let Reason::ObligationHeld {
obligation_id: id, ..
} = reason
{
if *id == ob_id {
obligation_reason_found = true;
break;
}
}
}
crate::assert_with_log!(
obligation_reason_found,
"temporal consistency across methods",
true,
obligation_reason_found
);
crate::test_complete!("introspection_conf_006_temporal_consistency_of_diagnostic_queries");
}
#[test]
fn introspection_conf_007_resource_leak_detection_accuracy() {
init_test("introspection_conf_007_resource_leak_detection_accuracy");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child1 = insert_child_region(&mut state, root);
let child2 = insert_child_region(&mut state, root);
let task1 = insert_task(&mut state, child1, TaskState::Running);
let task2 = insert_task(&mut state, child2, TaskState::Running);
let completed_task = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let leak1 = insert_obligation(
&mut state,
child1,
task1,
ObligationKind::Ack,
Time::from_millis(100),
);
let leak2 = insert_obligation(
&mut state,
child2,
task2,
ObligationKind::SendPermit,
Time::from_millis(200),
);
let _non_leak = insert_obligation(
&mut state,
root,
completed_task,
ObligationKind::Lease,
Time::from_millis(300),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 2, "accurate leak count", 2usize, leaks.len());
let leak_ids: Vec<ObligationId> = leaks.iter().map(|l| l.obligation_id).collect();
crate::assert_with_log!(
leak_ids.contains(&leak1),
"detects first leak",
true,
leak_ids.contains(&leak1)
);
crate::assert_with_log!(
leak_ids.contains(&leak2),
"detects second leak",
true,
leak_ids.contains(&leak2)
);
for leak in &leaks {
match leak.obligation_id {
id if id == leak1 => {
crate::assert_with_log!(
leak.age.as_millis() == 900, "leak1 age accuracy",
900u128,
leak.age.as_millis()
);
crate::assert_with_log!(
leak.holder_task == Some(task1),
"leak1 holder accuracy",
true,
leak.holder_task == Some(task1)
);
}
id if id == leak2 => {
crate::assert_with_log!(
leak.age.as_millis() == 800, "leak2 age accuracy",
800u128,
leak.age.as_millis()
);
crate::assert_with_log!(
leak.holder_task == Some(task2),
"leak2 holder accuracy",
true,
leak.holder_task == Some(task2)
);
}
_ => {
crate::assert_with_log!(false, "unexpected leak detected", true, false);
}
}
}
crate::assert_with_log!(
leaks[0].region_id <= leaks[1].region_id,
"leaks sorted by region",
true,
leaks[0].region_id <= leaks[1].region_id
);
crate::test_complete!("introspection_conf_007_resource_leak_detection_accuracy");
}
#[test]
fn introspection_conf_008_deadlock_detection_determinism() {
init_test("introspection_conf_008_deadlock_detection_determinism");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_a = insert_task(&mut state, root, TaskState::Running);
let task_b = insert_task(&mut state, root, TaskState::Running);
let task_c = insert_task(&mut state, root, TaskState::Running);
state.task_mut(task_a).expect("task A").waiters.push(task_b);
state.task_mut(task_b).expect("task B").waiters.push(task_c);
state.task_mut(task_c).expect("task C").waiters.push(task_a);
let diagnostics = Diagnostics::new(Arc::new(state));
let report1 = diagnostics.analyze_directional_deadlock();
let report2 = diagnostics.analyze_directional_deadlock();
let report3 = diagnostics.analyze_directional_deadlock();
crate::assert_with_log!(
report1.severity == report2.severity,
"severity deterministic run1==run2",
true,
report1.severity == report2.severity
);
crate::assert_with_log!(
report2.severity == report3.severity,
"severity deterministic run2==run3",
true,
report2.severity == report3.severity
);
crate::assert_with_log!(
matches!(report1.severity, DeadlockSeverity::Critical),
"detects critical deadlock",
true,
matches!(report1.severity, DeadlockSeverity::Critical)
);
crate::assert_with_log!(
report1.cycles.len() == report2.cycles.len(),
"cycle count deterministic run1==run2",
report1.cycles.len(),
report2.cycles.len()
);
crate::assert_with_log!(
report2.cycles.len() == report3.cycles.len(),
"cycle count deterministic run2==run3",
report2.cycles.len(),
report3.cycles.len()
);
crate::assert_with_log!(
!report1.cycles.is_empty(),
"detects cycle",
true,
!report1.cycles.is_empty()
);
if !report1.cycles.is_empty() {
let cycle1 = &report1.cycles[0];
let cycle2 = &report2.cycles[0];
let cycle3 = &report3.cycles[0];
crate::assert_with_log!(
cycle1.tasks.len() == cycle2.tasks.len(),
"cycle task count deterministic run1==run2",
cycle1.tasks.len(),
cycle2.tasks.len()
);
crate::assert_with_log!(
cycle2.tasks.len() == cycle3.tasks.len(),
"cycle task count deterministic run2==run3",
cycle2.tasks.len(),
cycle3.tasks.len()
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_a),
"cycle contains task A",
true,
cycle1.tasks.contains(&task_a)
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_b),
"cycle contains task B",
true,
cycle1.tasks.contains(&task_b)
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_c),
"cycle contains task C",
true,
cycle1.tasks.contains(&task_c)
);
crate::assert_with_log!(
cycle1.trapped == cycle2.trapped,
"trapped status deterministic",
true,
cycle1.trapped == cycle2.trapped
);
}
let health1 = diagnostics.analyze_structural_health();
let health2 = diagnostics.analyze_structural_health();
let health_deterministic = match (&health1.classification, &health2.classification) {
(
crate::observability::spectral_health::HealthClassification::Healthy { .. },
crate::observability::spectral_health::HealthClassification::Healthy { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Deadlocked,
crate::observability::spectral_health::HealthClassification::Deadlocked,
) => true,
(
crate::observability::spectral_health::HealthClassification::Degraded { .. },
crate::observability::spectral_health::HealthClassification::Degraded { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Critical { .. },
crate::observability::spectral_health::HealthClassification::Critical { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
) => true,
_ => false,
};
crate::assert_with_log!(
health_deterministic,
"structural health deterministic",
true,
health_deterministic
);
crate::test_complete!("introspection_conf_008_deadlock_detection_determinism");
}
#[test]
fn introspection_conf_009_diagnostic_query_cancel_safety() {
init_test("introspection_conf_009_diagnostic_query_cancel_safety");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, root, TaskState::Running);
let task2 = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::user("test"),
cleanup_budget: Budget::with_deadline_ns(100_000_000), },
);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let _ob1 = insert_obligation(
&mut state,
root,
task1,
ObligationKind::Ack,
Time::from_millis(100),
);
let _ob2 = insert_obligation(
&mut state,
root,
task2,
ObligationKind::Lease,
Time::from_millis(200),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let baseline_explanation = diagnostics.explain_region_open(root);
let baseline_task1_explanation = diagnostics.explain_task_blocked(task1);
let baseline_task2_explanation = diagnostics.explain_task_blocked(task2);
let baseline_leaks = diagnostics.find_leaked_obligations();
let baseline_deadlock = diagnostics.analyze_directional_deadlock();
let baseline_health = diagnostics.analyze_structural_health();
let verify_explanation = diagnostics.explain_region_open(root);
let verify_task1_explanation = diagnostics.explain_task_blocked(task1);
let verify_task2_explanation = diagnostics.explain_task_blocked(task2);
let verify_leaks = diagnostics.find_leaked_obligations();
let verify_deadlock = diagnostics.analyze_directional_deadlock();
let verify_health = diagnostics.analyze_structural_health();
crate::assert_with_log!(
baseline_explanation.reasons.len() == verify_explanation.reasons.len(),
"region explanation cancel-safe",
baseline_explanation.reasons.len(),
verify_explanation.reasons.len()
);
let task1_reason_match = match (
&baseline_task1_explanation.block_reason,
&verify_task1_explanation.block_reason,
) {
(BlockReason::AwaitingSchedule, BlockReason::AwaitingSchedule) => true,
(BlockReason::NotStarted, BlockReason::NotStarted) => true,
(a, b) => format!("{:?}", a) == format!("{:?}", b),
};
crate::assert_with_log!(
task1_reason_match,
"task1 explanation cancel-safe",
true,
task1_reason_match
);
let task2_cancel_match = matches!(
(
&baseline_task2_explanation.block_reason,
&verify_task2_explanation.block_reason
),
(
BlockReason::CancelRequested { .. },
BlockReason::CancelRequested { .. }
)
);
crate::assert_with_log!(
task2_cancel_match,
"task2 cancel explanation cancel-safe",
true,
task2_cancel_match
);
crate::assert_with_log!(
baseline_leaks.len() == verify_leaks.len(),
"leak detection cancel-safe",
baseline_leaks.len(),
verify_leaks.len()
);
let deadlock_severity_match = baseline_deadlock.severity == verify_deadlock.severity;
crate::assert_with_log!(
deadlock_severity_match,
"deadlock detection cancel-safe",
true,
deadlock_severity_match
);
let health_class_match = match (
&baseline_health.classification,
&verify_health.classification,
) {
(
crate::observability::spectral_health::HealthClassification::Healthy { .. },
crate::observability::spectral_health::HealthClassification::Healthy { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Deadlocked,
crate::observability::spectral_health::HealthClassification::Deadlocked,
) => true,
(
crate::observability::spectral_health::HealthClassification::Degraded { .. },
crate::observability::spectral_health::HealthClassification::Degraded { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Critical { .. },
crate::observability::spectral_health::HealthClassification::Critical { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
) => true,
_ => false,
};
crate::assert_with_log!(
health_class_match,
"health analysis cancel-safe",
true,
health_class_match
);
for (baseline_leak, verify_leak) in baseline_leaks.iter().zip(verify_leaks.iter()) {
crate::assert_with_log!(
baseline_leak.obligation_id == verify_leak.obligation_id,
"leak ID cancel-safe",
true,
baseline_leak.obligation_id == verify_leak.obligation_id
);
crate::assert_with_log!(
baseline_leak.age == verify_leak.age,
"leak age cancel-safe",
true,
baseline_leak.age == verify_leak.age
);
}
crate::test_complete!("introspection_conf_009_diagnostic_query_cancel_safety");
}
#[test]
fn introspection_conf_010_runtime_introspection_endpoint_stability() {
init_test("introspection_conf_010_runtime_introspection_endpoint_stability");
let empty_state = Arc::new(RuntimeState::new());
let empty_diagnostics = Diagnostics::new(empty_state);
let empty_root = RegionId::new_for_test(999, 0);
let empty_explanation = empty_diagnostics.explain_region_open(empty_root);
crate::assert_with_log!(
matches!(
empty_explanation.reasons.first(),
Some(Reason::RegionNotFound)
),
"empty runtime handles missing region",
true,
matches!(
empty_explanation.reasons.first(),
Some(Reason::RegionNotFound)
)
);
let empty_task = TaskId::new_for_test(999, 0);
let empty_task_explanation = empty_diagnostics.explain_task_blocked(empty_task);
crate::assert_with_log!(
matches!(
empty_task_explanation.block_reason,
BlockReason::TaskNotFound
),
"empty runtime handles missing task",
true,
matches!(
empty_task_explanation.block_reason,
BlockReason::TaskNotFound
)
);
let empty_leaks = empty_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
empty_leaks.is_empty(),
"empty runtime has no leaks",
true,
empty_leaks.is_empty()
);
let mut minimal_state = RuntimeState::new();
let minimal_root = minimal_state.create_root_region(Budget::INFINITE);
let minimal_diagnostics = Diagnostics::new(Arc::new(minimal_state));
let minimal_explanation = minimal_diagnostics.explain_region_open(minimal_root);
crate::assert_with_log!(
minimal_explanation.region_state.is_some(),
"minimal runtime provides region state",
true,
minimal_explanation.region_state.is_some()
);
let minimal_leaks = minimal_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
minimal_leaks.is_empty(),
"minimal runtime has no leaks",
true,
minimal_leaks.is_empty()
);
let mut complex_state = RuntimeState::new();
let complex_root = complex_state.create_root_region(Budget::INFINITE);
let complex_child1 = insert_child_region(&mut complex_state, complex_root);
let complex_child2 = insert_child_region(&mut complex_state, complex_root);
let _complex_task1 = insert_task(&mut complex_state, complex_child1, TaskState::Running);
let _complex_task2 = insert_task(
&mut complex_state,
complex_child2,
TaskState::Completed(Outcome::Ok(())),
);
let complex_task3 = insert_task(
&mut complex_state,
complex_root,
TaskState::CancelRequested {
reason: CancelReason::user("cleanup"),
cleanup_budget: Budget::with_deadline_ns(200_000_000), },
);
let complex_diagnostics = Diagnostics::new(Arc::new(complex_state));
let complex_explanation = complex_diagnostics.explain_region_open(complex_root);
crate::assert_with_log!(
!complex_explanation.reasons.is_empty(),
"complex runtime provides detailed reasons",
true,
!complex_explanation.reasons.is_empty()
);
let complex_cancel_explanation = complex_diagnostics.explain_task_blocked(complex_task3);
let is_complex_cancel = matches!(
complex_cancel_explanation.block_reason,
BlockReason::CancelRequested { .. }
);
crate::assert_with_log!(
is_complex_cancel,
"complex runtime tracks cancel state",
true,
is_complex_cancel
);
let mut timer_state = RuntimeState::new();
let timer_root = timer_state.create_root_region(Budget::INFINITE);
let timer_task = insert_task(&mut timer_state, timer_root, TaskState::Running);
let virtual_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5000)));
timer_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(virtual_clock));
let _timer_obligation = insert_obligation(
&mut timer_state,
timer_root,
timer_task,
ObligationKind::Ack,
Time::from_millis(1000),
);
let timer_diagnostics = Diagnostics::new(Arc::new(timer_state));
let timer_leaks = timer_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
!timer_leaks.is_empty(),
"timer-enabled runtime detects leaks",
true,
!timer_leaks.is_empty()
);
if let Some(leak) = timer_leaks.first() {
crate::assert_with_log!(
leak.age.as_millis() == 4000, "timer-enabled runtime calculates age correctly",
4000u128,
leak.age.as_millis()
);
}
let all_deadlock_reports = [
empty_diagnostics.analyze_directional_deadlock(),
minimal_diagnostics.analyze_directional_deadlock(),
complex_diagnostics.analyze_directional_deadlock(),
timer_diagnostics.analyze_directional_deadlock(),
];
for report in &all_deadlock_reports {
crate::assert_with_log!(
report.risk_score >= 0.0 && report.risk_score <= 1.0,
"deadlock risk score in valid range",
true,
report.risk_score >= 0.0 && report.risk_score <= 1.0
);
}
let all_health_reports = [
empty_diagnostics.analyze_structural_health(),
minimal_diagnostics.analyze_structural_health(),
complex_diagnostics.analyze_structural_health(),
timer_diagnostics.analyze_structural_health(),
];
for health in &all_health_reports {
let valid_classification = matches!(
health.classification,
crate::observability::spectral_health::HealthClassification::Healthy { .. }
| crate::observability::spectral_health::HealthClassification::Degraded { .. }
| crate::observability::spectral_health::HealthClassification::Deadlocked
| crate::observability::spectral_health::HealthClassification::Critical { .. }
| crate::observability::spectral_health::HealthClassification::Fragmented { .. }
);
crate::assert_with_log!(
valid_classification,
"health classification is valid",
true,
valid_classification
);
}
crate::test_complete!("introspection_conf_010_runtime_introspection_endpoint_stability");
}
#[test]
fn structured_diagnostic_report_snapshot_v4_with_extended_metrics() {
init_test("structured_diagnostic_report_snapshot_v4_with_extended_metrics");
let sections = vec![
DiagnosticReportV4Section {
label: "task_execution",
status: "degraded",
accounting: DiagnosticResourceAccounting {
total_regions: 12,
open_regions: 8,
total_tasks: 142,
live_tasks: 89,
total_obligations: 23,
leaked_obligations: 2,
},
histograms: vec![DiagnosticMetricHistogram {
name: "task_execution_latency_ms".to_string(),
buckets: vec![
("0-1ms".to_string(), 1247),
("1-5ms".to_string(), 823),
("5-10ms".to_string(), 156),
("10-50ms".to_string(), 89),
("50-100ms".to_string(), 12),
("100ms+".to_string(), 3),
],
total_count: 2330,
percentiles: vec![(50.0, 1.2), (95.0, 8.7), (99.0, 24.1), (99.9, 78.3)],
}],
rendered: "task_execution: Task execution metrics\n latency_p50: 1.2ms\n latency_p99: 24.1ms\n completion_rate: 94.2%",
},
DiagnosticReportV4Section {
label: "region_lifecycle",
status: "critical",
accounting: DiagnosticResourceAccounting {
total_regions: 67,
open_regions: 37,
total_tasks: 245,
live_tasks: 156,
total_obligations: 45,
leaked_obligations: 8,
},
histograms: vec![DiagnosticMetricHistogram {
name: "region_lifecycle_duration_ms".to_string(),
buckets: vec![
("0-10ms".to_string(), 89),
("10-100ms".to_string(), 156),
("100ms-1s".to_string(), 67),
("1s-10s".to_string(), 23),
("10s+".to_string(), 8),
],
total_count: 343,
percentiles: vec![(50.0, 45.2), (95.0, 2100.0), (99.0, 6780.0)],
}],
rendered: "region_lifecycle: Region lifecycle analysis\n duration_p50: 45.2ms\n duration_p99: 6.78s\n long_lived_count: 8",
},
DiagnosticReportV4Section {
label: "obligation_tracking",
status: "passing",
accounting: DiagnosticResourceAccounting {
total_regions: 34,
open_regions: 23,
total_tasks: 178,
live_tasks: 134,
total_obligations: 67,
leaked_obligations: 0,
},
histograms: vec![DiagnosticMetricHistogram {
name: "obligation_hold_time_ms".to_string(),
buckets: vec![
("0-1s".to_string(), 1200),
("1s-10s".to_string(), 45),
("10s-1min".to_string(), 12),
("1min+".to_string(), 3),
],
total_count: 1260,
percentiles: vec![
(50.0, 120.0),
(95.0, 8900.0),
(99.0, 45000.0),
(99.9, 120000.0),
],
}],
rendered: "obligation_tracking: Obligation hold time analysis\n hold_time_p50: 120ms\n hold_time_p99: 45s\n leak_candidates: 3",
},
];
let rendered = render_structured_diagnostic_report_v4(§ions);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v4",
&rendered,
);
crate::test_complete!("structured_diagnostic_report_snapshot_v4_with_extended_metrics");
}
#[test]
fn structured_diagnostic_report_v3_schema_golden_snapshot() {
let healthy_accounting = DiagnosticResourceAccounting {
total_regions: 1,
open_regions: 0,
total_tasks: 1,
live_tasks: 0,
total_obligations: 0,
leaked_obligations: 0,
};
let degraded_accounting = DiagnosticResourceAccounting {
total_regions: 3,
open_regions: 2,
total_tasks: 5,
live_tasks: 3,
total_obligations: 2,
leaked_obligations: 1,
};
let error_accounting = DiagnosticResourceAccounting {
total_regions: 2,
open_regions: 2,
total_tasks: 4,
live_tasks: 4,
total_obligations: 3,
leaked_obligations: 3,
};
let sections = vec![
DiagnosticReportV3Section {
label: "healthy_system",
status: "passing",
accounting: healthy_accounting,
rendered: "scenario: healthy_system\ngenerated_at: 2026-04-21T14:30:00Z\n\n[region]\nnone\n\n[task]\nAll tasks completed successfully.\n\n[leaks]\nnone",
},
DiagnosticReportV3Section {
label: "degraded_performance",
status: "degraded",
accounting: degraded_accounting,
rendered: "scenario: degraded_performance\ngenerated_at: 2026-04-21T14:30:01Z\n\n[region]\nRegion RegionId(1:0) has slow drain (2 children pending).\n\n[task]\nTask TaskId(2:1) experiencing high latency (p99: 450ms).\n\n[leaks]\n- ObligationId(1:0) region=RegionId(1:0) holder=Some(TaskId(2:1)) type=Lease age_ms=1500",
},
DiagnosticReportV3Section {
label: "error_state",
status: "critical",
accounting: error_accounting,
rendered: "scenario: error_state\ngenerated_at: 2026-04-21T14:30:02Z\n\n[region]\nRegion RegionId(0:0) has deadlock detected (cycle length: 2).\n\n[task]\nTask TaskId(0:0) blocked: deadlock (waiting on TaskId(1:0)).\n\n[leaks]\n- ObligationId(0:0) region=RegionId(0:0) holder=Some(TaskId(0:0)) type=Ack age_ms=2000\n- ObligationId(1:0) region=RegionId(0:0) holder=Some(TaskId(1:0)) type=Lease age_ms=2100\n- ObligationId(2:0) region=RegionId(1:0) holder=Some(TaskId(2:0)) type=Ack age_ms=1800",
},
];
let rendered = render_structured_diagnostic_report_v3(§ions);
assert_diagnostic_report_snapshot(
"observability_diagnostics_v3_schema_validation",
&rendered,
);
}
}