use crate::console::Console;
use crate::observability::spectral_health::{
SpectralHealthMonitor, SpectralHealthReport, SpectralThresholds,
};
const MAX_SANITIZED_CANCEL_MESSAGE_LEN: usize = 256;
fn sanitize_cancel_message(input: &str) -> String {
let mut out = String::with_capacity(input.len().min(MAX_SANITIZED_CANCEL_MESSAGE_LEN + 3));
let mut byte_count = 0usize;
let mut truncated = false;
for ch in input.chars() {
let mapped = match ch {
'\r' | '\n' | '\t' => ' ',
c if (c as u32) < 0x20 || c == '\u{7f}' => '?',
c => c,
};
let mapped_len = mapped.len_utf8();
if byte_count.saturating_add(mapped_len) > MAX_SANITIZED_CANCEL_MESSAGE_LEN {
truncated = true;
break;
}
out.push(mapped);
byte_count = byte_count.saturating_add(mapped_len);
}
if truncated {
out.push('…');
}
out
}
use crate::record::ObligationState;
use crate::record::region::RegionState;
use crate::record::task::TaskState;
use crate::runtime::state::RuntimeState;
use crate::time::TimerDriverHandle;
use crate::tracing_compat::{debug, trace, warn};
use crate::types::{CancelKind, ObligationId, RegionId, TaskId, Time};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
#[derive(Debug)]
pub struct Diagnostics {
state: Arc<RuntimeState>,
spectral_monitor: parking_lot::Mutex<SpectralHealthMonitor>,
}
impl Diagnostics {
#[must_use]
pub fn new(state: Arc<RuntimeState>) -> Self {
Self {
state,
spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
SpectralThresholds::default(),
)),
}
}
#[must_use]
pub fn with_console(state: Arc<RuntimeState>, _console: Console) -> Self {
Self {
state,
spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
SpectralThresholds::default(),
)),
}
}
fn now(&self) -> Time {
self.state
.timer_driver()
.map_or(self.state.now, TimerDriverHandle::now)
}
fn build_task_wait_graph(&self) -> TaskWaitGraph {
let mut task_ids: Vec<TaskId> = self
.state
.tasks_iter()
.filter_map(|(_, task)| (!task.state.is_terminal()).then_some(task.id))
.collect();
task_ids.sort();
let index_by_task: BTreeMap<TaskId, usize> = task_ids
.iter()
.enumerate()
.map(|(i, id)| (*id, i))
.collect();
let mut directed_edges = Vec::new();
for (_, task) in self.state.tasks_iter() {
if task.state.is_terminal() {
continue;
}
let Some(&target_idx) = index_by_task.get(&task.id) else {
continue;
};
for waiter in &task.waiters {
if let Some(&waiter_idx) = index_by_task.get(waiter) {
directed_edges.push((waiter_idx, target_idx));
}
}
}
directed_edges.sort_unstable();
directed_edges.dedup();
let undirected_edges: Vec<(usize, usize)> = directed_edges
.iter()
.map(|(u, v)| if u < v { (*u, *v) } else { (*v, *u) })
.collect::<std::collections::BTreeSet<_>>()
.into_iter()
.collect();
TaskWaitGraph {
task_ids,
directed_edges,
undirected_edges,
}
}
#[must_use]
pub fn analyze_structural_health(&self) -> SpectralHealthReport {
let graph = self.build_task_wait_graph();
let adjacency = wait_graph_adjacency(&graph);
let mut monitor = self.spectral_monitor.lock();
monitor.analyze_with_trapped_cycle(
graph.task_ids.len(),
&graph.undirected_edges,
has_trapped_wait_cycle(&adjacency),
)
}
#[must_use]
pub fn analyze_directional_deadlock(&self) -> DirectionalDeadlockReport {
let graph = self.build_task_wait_graph();
if graph.task_ids.is_empty() {
return DirectionalDeadlockReport::empty();
}
let adjacency = wait_graph_adjacency(&graph);
let sccs = strongly_connected_components(&adjacency);
let mut components = Vec::new();
let mut trapped = 0_u32;
let mut cycle_nodes = 0_usize;
for nodes in sccs {
let has_cycle = if nodes.len() > 1 {
true
} else {
let n0 = nodes[0];
adjacency[n0].contains(&n0)
};
if !has_cycle {
continue;
}
cycle_nodes = cycle_nodes.saturating_add(nodes.len());
let mut ingress = 0_u32;
let mut egress = 0_u32;
for &u in &nodes {
for &v in &adjacency[u] {
if nodes.binary_search(&v).is_ok() {
continue;
}
egress = egress.saturating_add(1);
}
}
let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
for (u, edges) in adjacency.iter().enumerate() {
if node_set.contains(&u) {
continue;
}
for &v in edges {
if node_set.contains(&v) {
ingress = ingress.saturating_add(1);
}
}
}
let trapped_component = egress == 0;
if trapped_component {
trapped = trapped.saturating_add(1);
}
let mut tasks: Vec<TaskId> = nodes.iter().map(|idx| graph.task_ids[*idx]).collect();
tasks.sort();
components.push(DeadlockCycle {
tasks,
ingress_edges: ingress,
egress_edges: egress,
trapped: trapped_component,
});
}
components.sort_by_key(|c| c.tasks.len());
components.reverse();
#[allow(clippy::cast_precision_loss)]
let cycle_ratio = if graph.task_ids.is_empty() {
0.0
} else {
cycle_nodes as f64 / graph.task_ids.len() as f64
};
#[allow(clippy::cast_precision_loss)]
let trapped_ratio = if components.is_empty() {
0.0
} else {
f64::from(trapped) / components.len() as f64
};
let risk_score = 0.6f64
.mul_add(trapped_ratio, 0.4 * cycle_ratio)
.clamp(0.0, 1.0);
let severity = if trapped > 0 {
DeadlockSeverity::Critical
} else if !components.is_empty() {
DeadlockSeverity::Elevated
} else {
DeadlockSeverity::None
};
DirectionalDeadlockReport {
severity,
risk_score,
cycles: components,
}
}
#[must_use]
pub fn explain_region_open(&self, region_id: RegionId) -> RegionOpenExplanation {
trace!(region_id = ?region_id, "diagnostics: explain_region_open");
let Some(region) = self.state.region(region_id) else {
return RegionOpenExplanation {
region_id,
region_state: None,
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["Verify region id is valid".to_string()],
};
};
let region_state = region.state();
if region_state == RegionState::Closed {
return RegionOpenExplanation {
region_id,
region_state: Some(region_state),
reasons: Vec::new(),
recommendations: Vec::new(),
};
}
let mut reasons = Vec::new();
let mut child_ids = region.child_ids();
child_ids.sort();
for child_id in child_ids {
if let Some(child) = self.state.region(child_id) {
let child_state = child.state();
if child_state != RegionState::Closed {
reasons.push(Reason::ChildRegionOpen {
child_id,
child_state,
});
}
}
}
let mut task_ids = region.task_ids();
task_ids.sort();
for task_id in task_ids {
if let Some(task) = self.state.task(task_id) {
if !task.state.is_terminal() {
reasons.push(Reason::TaskRunning {
task_id,
task_state: task.state_name().to_string(),
poll_count: task.total_polls,
});
}
}
}
let mut held = Vec::new();
for (_, ob) in self.state.obligations_iter() {
if ob.region == region_id && ob.state == ObligationState::Reserved {
held.push((ob.id, ob.holder, ob.kind));
}
}
held.sort_by_key(|(id, _, _)| *id);
for (id, holder, kind) in held {
reasons.push(Reason::ObligationHeld {
obligation_id: id,
obligation_type: format!("{kind:?}"),
holder_task: holder,
});
}
let mut recommendations = Vec::new();
if reasons
.iter()
.any(|r| matches!(r, Reason::ChildRegionOpen { .. }))
{
recommendations.push("Wait for child regions to close, or cancel them.".to_string());
}
if reasons
.iter()
.any(|r| matches!(r, Reason::TaskRunning { .. }))
{
recommendations
.push("Wait for live tasks to complete, or cancel the region.".to_string());
}
if reasons
.iter()
.any(|r| matches!(r, Reason::ObligationHeld { .. }))
{
recommendations
.push("Ensure obligations are committed/aborted before closing.".to_string());
}
let deadlock = self.analyze_directional_deadlock();
if deadlock.severity != DeadlockSeverity::None {
recommendations.push(format!(
"Directional deadlock risk {:?} (score {:.3}); inspect cycles and break wait-for loops.",
deadlock.severity, deadlock.risk_score
));
}
debug!(
region_id = ?region_id,
region_state = ?region_state,
reason_count = reasons.len(),
"diagnostics: region open explanation computed"
);
RegionOpenExplanation {
region_id,
region_state: Some(region_state),
reasons,
recommendations,
}
}
#[must_use]
pub fn explain_task_blocked(&self, task_id: TaskId) -> TaskBlockedExplanation {
trace!(task_id = ?task_id, "diagnostics: explain_task_blocked");
let Some(task) = self.state.task(task_id) else {
return TaskBlockedExplanation {
task_id,
block_reason: BlockReason::TaskNotFound,
details: Vec::new(),
recommendations: vec!["Verify task id is valid".to_string()],
};
};
let mut details = Vec::new();
let mut recommendations = Vec::new();
let block_reason = match &task.state {
TaskState::Created => {
recommendations.push("Task has not started polling yet.".to_string());
BlockReason::NotStarted
}
TaskState::Running => {
if task.wake_state.is_notified() {
recommendations
.push("Task has a pending wake; it should be scheduled soon.".to_string());
BlockReason::AwaitingSchedule
} else {
recommendations
.push("Task appears to be awaiting an async operation.".to_string());
BlockReason::AwaitingFuture {
description: "unknown await point".to_string(),
}
}
}
TaskState::CancelRequested { reason, .. } => {
details.push(format!("cancel kind: {}", reason.kind));
if let Some(msg) = &reason.message.as_deref() {
details.push(format!("message: {}", sanitize_cancel_message(msg)));
}
recommendations.push("Task is cancelling; wait for drain/finalizers.".to_string());
BlockReason::CancelRequested {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
}
}
TaskState::Cancelling {
reason,
cleanup_budget,
} => {
details.push(format!("cancel kind: {}", reason.kind));
details.push(format!(
"cleanup polls remaining: {}",
cleanup_budget.poll_quota
));
BlockReason::RunningCleanup {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
polls_remaining: cleanup_budget.poll_quota,
}
}
TaskState::Finalizing {
reason,
cleanup_budget,
} => {
details.push(format!("cancel kind: {}", reason.kind));
details.push(format!(
"cleanup polls remaining: {}",
cleanup_budget.poll_quota
));
BlockReason::Finalizing {
reason: CancelReasonInfo::from_reason(reason.kind, reason.message.as_deref()),
polls_remaining: cleanup_budget.poll_quota,
}
}
TaskState::Completed(outcome) => {
details.push(format!("outcome: {outcome:?}"));
BlockReason::Completed
}
};
if !task.waiters.is_empty() {
details.push(format!("waiters: {}", task.waiters.len()));
}
TaskBlockedExplanation {
task_id,
block_reason,
details,
recommendations,
}
}
#[must_use]
pub fn find_leaked_obligations(&self) -> Vec<ObligationLeak> {
let now = self.now();
let mut leaks = Vec::new();
for (_, ob) in self.state.obligations_iter() {
if ob.state == ObligationState::Reserved {
if let Some(holder) = self.state.task(ob.holder) {
if matches!(holder.state, TaskState::Completed(_)) {
continue;
}
}
let age = std::time::Duration::from_nanos(now.duration_since(ob.reserved_at));
leaks.push(ObligationLeak {
obligation_id: ob.id,
obligation_type: format!("{:?}", ob.kind),
holder_task: Some(ob.holder),
region_id: ob.region,
age,
});
}
}
leaks.sort_by_key(|l| (l.region_id, l.obligation_id));
if !leaks.is_empty() {
warn!(
count = leaks.len(),
"diagnostics: potential obligation leaks detected"
);
}
leaks
}
}
#[derive(Debug, Clone)]
struct TaskWaitGraph {
task_ids: Vec<TaskId>,
directed_edges: Vec<(usize, usize)>,
undirected_edges: Vec<(usize, usize)>,
}
fn wait_graph_adjacency(graph: &TaskWaitGraph) -> Vec<Vec<usize>> {
let mut adjacency = vec![Vec::new(); graph.task_ids.len()];
for &(u, v) in &graph.directed_edges {
if u < adjacency.len() && v < adjacency.len() {
adjacency[u].push(v);
}
}
for edges in &mut adjacency {
edges.sort_unstable();
edges.dedup();
}
adjacency
}
fn has_trapped_wait_cycle(adjacency: &[Vec<usize>]) -> bool {
for nodes in strongly_connected_components(adjacency) {
let has_cycle = if nodes.len() > 1 {
true
} else {
let n0 = nodes[0];
adjacency[n0].contains(&n0)
};
if !has_cycle {
continue;
}
let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
let has_egress = nodes
.iter()
.any(|&u| adjacency[u].iter().any(|v| !node_set.contains(v)));
if !has_egress {
return true;
}
}
false
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadlockSeverity {
None,
Elevated,
Critical,
}
#[derive(Debug, Clone)]
pub struct DeadlockCycle {
pub tasks: Vec<TaskId>,
pub ingress_edges: u32,
pub egress_edges: u32,
pub trapped: bool,
}
#[derive(Debug, Clone)]
pub struct DirectionalDeadlockReport {
pub severity: DeadlockSeverity,
pub risk_score: f64,
pub cycles: Vec<DeadlockCycle>,
}
impl DirectionalDeadlockReport {
#[must_use]
fn empty() -> Self {
Self {
severity: DeadlockSeverity::None,
risk_score: 0.0,
cycles: Vec::new(),
}
}
}
#[must_use]
fn strongly_connected_components(adjacency: &[Vec<usize>]) -> Vec<Vec<usize>> {
struct Tarjan<'a> {
adjacency: &'a [Vec<usize>],
index: usize,
stack: Vec<usize>,
on_stack: Vec<bool>,
indices: Vec<Option<usize>>,
lowlink: Vec<usize>,
sccs: Vec<Vec<usize>>,
}
impl Tarjan<'_> {
fn strongconnect(&mut self, v: usize) {
self.indices[v] = Some(self.index);
self.lowlink[v] = self.index;
self.index += 1;
self.stack.push(v);
self.on_stack[v] = true;
for &w in &self.adjacency[v] {
if self.indices[w].is_none() {
self.strongconnect(w);
self.lowlink[v] = self.lowlink[v].min(self.lowlink[w]);
} else if self.on_stack[w] {
self.lowlink[v] = self.lowlink[v].min(self.indices[w].unwrap_or(usize::MAX));
}
}
if self.lowlink[v] == self.indices[v].unwrap_or(usize::MAX) {
let mut scc = Vec::new();
while let Some(w) = self.stack.pop() {
self.on_stack[w] = false;
scc.push(w);
if w == v {
break;
}
}
scc.sort_unstable();
self.sccs.push(scc);
}
}
}
let n = adjacency.len();
let mut tarjan = Tarjan {
adjacency,
index: 0,
stack: Vec::new(),
on_stack: vec![false; n],
indices: vec![None; n],
lowlink: vec![0; n],
sccs: Vec::new(),
};
for v in 0..n {
if tarjan.indices[v].is_none() {
tarjan.strongconnect(v);
}
}
tarjan.sccs
}
#[derive(Debug, Clone)]
pub struct RegionOpenExplanation {
pub region_id: RegionId,
pub region_state: Option<RegionState>,
pub reasons: Vec<Reason>,
pub recommendations: Vec<String>,
}
impl fmt::Display for RegionOpenExplanation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Region {:?} is still open.", self.region_id)?;
if let Some(st) = self.region_state {
writeln!(f, " state: {st:?}")?;
}
for r in &self.reasons {
writeln!(f, " - {r}")?;
}
for rec in &self.recommendations {
writeln!(f, " -> {rec}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum Reason {
RegionNotFound,
ChildRegionOpen {
child_id: RegionId,
child_state: RegionState,
},
TaskRunning {
task_id: TaskId,
task_state: String,
poll_count: u64,
},
ObligationHeld {
obligation_id: ObligationId,
obligation_type: String,
holder_task: TaskId,
},
}
impl fmt::Display for Reason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RegionNotFound => write!(f, "region not found"),
Self::ChildRegionOpen {
child_id,
child_state,
} => write!(f, "child region {child_id:?} still open ({child_state:?})"),
Self::TaskRunning {
task_id,
task_state,
poll_count,
} => write!(
f,
"task {task_id:?} still running (state={task_state}, polls={poll_count})"
),
Self::ObligationHeld {
obligation_id,
obligation_type,
holder_task,
} => write!(
f,
"obligation {obligation_id:?} held by task {holder_task:?} (type={obligation_type})"
),
}
}
}
#[derive(Debug, Clone)]
pub struct TaskBlockedExplanation {
pub task_id: TaskId,
pub block_reason: BlockReason,
pub details: Vec<String>,
pub recommendations: Vec<String>,
}
impl fmt::Display for TaskBlockedExplanation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Task {:?} blocked: {}", self.task_id, self.block_reason)?;
for d in &self.details {
writeln!(f, " - {d}")?;
}
for rec in &self.recommendations {
writeln!(f, " -> {rec}")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum BlockReason {
TaskNotFound,
NotStarted,
AwaitingSchedule,
AwaitingFuture {
description: String,
},
CancelRequested {
reason: CancelReasonInfo,
},
RunningCleanup {
reason: CancelReasonInfo,
polls_remaining: u32,
},
Finalizing {
reason: CancelReasonInfo,
polls_remaining: u32,
},
Completed,
}
impl fmt::Display for BlockReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TaskNotFound => f.write_str("task not found"),
Self::NotStarted => f.write_str("not started"),
Self::AwaitingSchedule => f.write_str("awaiting schedule"),
Self::AwaitingFuture { description } => write!(f, "awaiting future ({description})"),
Self::CancelRequested { reason } => write!(f, "cancel requested ({reason})"),
Self::RunningCleanup {
reason,
polls_remaining,
} => write!(
f,
"running cleanup ({reason}, polls_remaining={polls_remaining})"
),
Self::Finalizing {
reason,
polls_remaining,
} => write!(
f,
"finalizing ({reason}, polls_remaining={polls_remaining})"
),
Self::Completed => f.write_str("completed"),
}
}
}
#[derive(Debug, Clone)]
pub struct CancellationExplanation {
pub kind: CancelKind,
pub message: Option<String>,
pub propagation_path: Vec<CancellationStep>,
}
#[derive(Debug, Clone)]
pub struct CancellationStep {
pub region_id: RegionId,
pub kind: CancelKind,
}
#[derive(Debug, Clone)]
pub struct CancelReasonInfo {
pub kind: CancelKind,
pub message: Option<String>,
}
impl CancelReasonInfo {
fn from_reason(kind: CancelKind, message: Option<&str>) -> Self {
Self {
kind,
message: message.map(sanitize_cancel_message),
}
}
}
impl fmt::Display for CancelReasonInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(msg) = &self.message {
write!(f, "{} ({msg})", self.kind)
} else {
write!(f, "{}", self.kind)
}
}
}
#[derive(Debug, Clone)]
pub struct ObligationLeak {
pub obligation_id: ObligationId,
pub obligation_type: String,
pub holder_task: Option<TaskId>,
pub region_id: RegionId,
pub age: std::time::Duration,
}
pub const ADVANCED_OBSERVABILITY_CONTRACT_VERSION: &str = "doctor-observability-v1";
pub const ADVANCED_OBSERVABILITY_BASELINE_VERSION: &str = "doctor-logging-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedEventClass {
CommandLifecycle,
IntegrationReliability,
RemediationSafety,
ReplayDeterminism,
VerificationGovernance,
}
impl AdvancedEventClass {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::CommandLifecycle => "command_lifecycle",
Self::IntegrationReliability => "integration_reliability",
Self::RemediationSafety => "remediation_safety",
Self::ReplayDeterminism => "replay_determinism",
Self::VerificationGovernance => "verification_governance",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedSeverity {
Info,
Warning,
Error,
Critical,
}
impl AdvancedSeverity {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Info => "info",
Self::Warning => "warning",
Self::Error => "error",
Self::Critical => "critical",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TroubleshootingDimension {
CancellationPath,
ContractCompliance,
Determinism,
ExternalDependency,
OperatorAction,
RecoveryPlanning,
RuntimeInvariant,
}
impl TroubleshootingDimension {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::CancellationPath => "cancellation_path",
Self::ContractCompliance => "contract_compliance",
Self::Determinism => "determinism",
Self::ExternalDependency => "external_dependency",
Self::OperatorAction => "operator_action",
Self::RecoveryPlanning => "recovery_planning",
Self::RuntimeInvariant => "runtime_invariant",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedEventClassSpec {
pub class_id: String,
pub description: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedSeveritySpec {
pub severity: String,
pub meaning: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TroubleshootingDimensionSpec {
pub dimension: String,
pub purpose: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedObservabilityContract {
pub contract_version: String,
pub baseline_contract_version: String,
pub event_classes: Vec<AdvancedEventClassSpec>,
pub severity_semantics: Vec<AdvancedSeveritySpec>,
pub troubleshooting_dimensions: Vec<TroubleshootingDimensionSpec>,
pub compatibility_notes: Vec<String>,
}
pub const TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION: &str = "runtime-tail-latency-taxonomy-v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyLogFieldSpec {
pub key: String,
pub unit: String,
pub required: bool,
pub meaning: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencySignalSpec {
pub signal_id: String,
pub structured_log_key: String,
pub unit: String,
pub producer_kind: String,
pub producer_symbol: String,
pub producer_file: String,
pub measurement_class: String,
pub core: bool,
pub notes: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyTermSpec {
pub term_id: String,
pub description: String,
pub direct_duration_key: String,
pub attribution_state_key: String,
pub signals: Vec<TailLatencySignalSpec>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyTaxonomyContract {
pub contract_version: String,
pub equation: String,
pub total_latency_key: String,
pub unknown_bucket_key: String,
pub required_log_fields: Vec<TailLatencyLogFieldSpec>,
pub terms: Vec<TailLatencyTermSpec>,
pub sampling_policy: Vec<String>,
pub compatibility_notes: Vec<String>,
}
pub const TAIL_LATENCY_COMPACT_EVENT_SCHEMA_VERSION: &str =
"runtime-tail-causal-attribution-event-v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum TailLatencyFieldValue {
Text(String),
Unsigned(u64),
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TailLatencyEmitterConfig {
pub enabled: bool,
pub include_extended_allocator_bytes_live: bool,
}
impl TailLatencyEmitterConfig {
#[must_use]
pub const fn enabled_core() -> Self {
Self {
enabled: true,
include_extended_allocator_bytes_live: false,
}
}
#[must_use]
pub const fn with_extended_allocator_bytes_live(mut self) -> Self {
self.include_extended_allocator_bytes_live = true;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct TailLatencyCompactSample {
pub total_latency_ns: u64,
pub ready_queue_depth: Option<u64>,
pub poll_count: Option<u64>,
pub events_received: Option<u64>,
pub retries_total_delay_ns: Option<u64>,
pub synchronization_lock_wait_ns: Option<u64>,
pub allocator_live_allocations: Option<u64>,
pub allocator_bytes_live: Option<u64>,
}
impl TailLatencyCompactSample {
#[must_use]
pub const fn new(total_latency_ns: u64) -> Self {
Self {
total_latency_ns,
ready_queue_depth: None,
poll_count: None,
events_received: None,
retries_total_delay_ns: None,
synchronization_lock_wait_ns: None,
allocator_live_allocations: None,
allocator_bytes_live: None,
}
}
#[must_use]
pub const fn with_ready_queue_depth(mut self, value: u64) -> Self {
self.ready_queue_depth = Some(value);
self
}
#[must_use]
pub const fn with_poll_count(mut self, value: u64) -> Self {
self.poll_count = Some(value);
self
}
#[must_use]
pub const fn with_events_received(mut self, value: u64) -> Self {
self.events_received = Some(value);
self
}
#[must_use]
pub const fn with_retries_total_delay_ns(mut self, value: u64) -> Self {
self.retries_total_delay_ns = Some(value);
self
}
#[must_use]
pub const fn with_synchronization_lock_wait_ns(mut self, value: u64) -> Self {
self.synchronization_lock_wait_ns = Some(value);
self
}
#[must_use]
pub const fn with_allocator_live_allocations(mut self, value: u64) -> Self {
self.allocator_live_allocations = Some(value);
self
}
#[must_use]
pub const fn with_allocator_bytes_live(mut self, value: u64) -> Self {
self.allocator_bytes_live = Some(value);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyCompactEvent {
pub schema_version: String,
pub scenario_id: String,
pub event_id: String,
pub taxonomy_version: String,
pub fields: BTreeMap<String, TailLatencyFieldValue>,
pub attribution_states: BTreeMap<String, String>,
pub missing_producers: Vec<String>,
pub unknown_unmeasured_ns: u64,
pub overhead_estimate_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TailLatencyEmitError {
EmptyScenarioId,
EmptyEventId,
DirectDurationExceedsTotal {
total_latency_ns: u64,
direct_duration_ns: u64,
},
}
impl fmt::Display for TailLatencyEmitError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::EmptyScenarioId => f.write_str("tail-latency scenario id must not be empty"),
Self::EmptyEventId => f.write_str("tail-latency event id must not be empty"),
Self::DirectDurationExceedsTotal {
total_latency_ns,
direct_duration_ns,
} => write!(
f,
"tail-latency direct durations ({direct_duration_ns}ns) exceed total latency ({total_latency_ns}ns)"
),
}
}
}
impl std::error::Error for TailLatencyEmitError {}
fn push_tail_missing(
missing: &mut Vec<String>,
attribution_states: &mut BTreeMap<String, String>,
term: &str,
producer_key: &str,
) {
missing.push(producer_key.to_string());
attribution_states.insert(term.to_string(), "missing_producer".to_string());
}
fn tail_field_from_optional(
fields: &mut BTreeMap<String, TailLatencyFieldValue>,
missing: &mut Vec<String>,
attribution_states: &mut BTreeMap<String, String>,
key: &str,
term: &str,
state_when_present: &str,
value: Option<u64>,
) {
match value {
Some(value) => {
fields.insert(key.to_string(), TailLatencyFieldValue::Unsigned(value));
attribution_states.insert(term.to_string(), state_when_present.to_string());
}
None => {
fields.insert(key.to_string(), TailLatencyFieldValue::Unsigned(0));
push_tail_missing(missing, attribution_states, term, key);
}
}
}
fn tail_compact_event_overhead_estimate_bytes(
field_count: usize,
state_count: usize,
missing_count: usize,
) -> usize {
std::mem::size_of::<TailLatencyCompactSample>()
+ std::mem::size_of::<TailLatencyCompactEvent>()
+ (field_count * 96)
+ (state_count * 64)
+ (missing_count * 48)
}
pub fn emit_tail_latency_compact_event(
config: TailLatencyEmitterConfig,
scenario_id: &str,
event_id: &str,
sample: TailLatencyCompactSample,
) -> Result<Option<TailLatencyCompactEvent>, TailLatencyEmitError> {
if !config.enabled {
return Ok(None);
}
if scenario_id.trim().is_empty() {
return Err(TailLatencyEmitError::EmptyScenarioId);
}
if event_id.trim().is_empty() {
return Err(TailLatencyEmitError::EmptyEventId);
}
let direct_duration_ns = sample
.retries_total_delay_ns
.unwrap_or(0)
.saturating_add(sample.synchronization_lock_wait_ns.unwrap_or(0));
if direct_duration_ns > sample.total_latency_ns {
return Err(TailLatencyEmitError::DirectDurationExceedsTotal {
total_latency_ns: sample.total_latency_ns,
direct_duration_ns,
});
}
let unknown_unmeasured_ns = sample.total_latency_ns - direct_duration_ns;
let mut fields = BTreeMap::new();
let mut attribution_states = BTreeMap::new();
let mut missing_producers = Vec::new();
fields.insert(
"tail.contract_version".to_string(),
TailLatencyFieldValue::Text(TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string()),
);
fields.insert(
"tail.total_latency_ns".to_string(),
TailLatencyFieldValue::Unsigned(sample.total_latency_ns),
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.queueing.ready_queue_depth",
"queueing",
"proxy_signal",
sample.ready_queue_depth,
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.service.poll_count",
"service",
"proxy_signal",
sample.poll_count,
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.io_or_network.events_received",
"io_or_network",
"proxy_signal",
sample.events_received,
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.retries.total_delay_ns",
"retries",
"direct_duration",
sample.retries_total_delay_ns,
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.synchronization.lock_wait_ns",
"synchronization",
"direct_duration",
sample.synchronization_lock_wait_ns,
);
tail_field_from_optional(
&mut fields,
&mut missing_producers,
&mut attribution_states,
"tail.allocator_or_cache.live_allocations",
"allocator_or_cache",
"proxy_signal",
sample.allocator_live_allocations,
);
fields.insert(
"tail.unknown.unmeasured_ns".to_string(),
TailLatencyFieldValue::Unsigned(unknown_unmeasured_ns),
);
attribution_states.insert(
"unknown".to_string(),
if unknown_unmeasured_ns == 0 {
"fully_attributed".to_string()
} else {
"residual".to_string()
},
);
if config.include_extended_allocator_bytes_live {
if let Some(bytes_live) = sample.allocator_bytes_live {
fields.insert(
"tail.allocator_or_cache.bytes_live".to_string(),
TailLatencyFieldValue::Unsigned(bytes_live),
);
}
}
missing_producers.sort();
missing_producers.dedup();
let overhead_estimate_bytes = tail_compact_event_overhead_estimate_bytes(
fields.len(),
attribution_states.len(),
missing_producers.len(),
);
Ok(Some(TailLatencyCompactEvent {
schema_version: TAIL_LATENCY_COMPACT_EVENT_SCHEMA_VERSION.to_string(),
scenario_id: scenario_id.trim().to_string(),
event_id: event_id.trim().to_string(),
taxonomy_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
fields,
attribution_states,
missing_producers,
unknown_unmeasured_ns,
overhead_estimate_bytes,
}))
}
pub const TAIL_LATENCY_BUDGET_CERTIFICATE_SCHEMA_VERSION: &str =
"runtime-latency-budget-certificate-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TailLatencyBudgetVerdict {
Pass,
NoWin,
FailClosed,
}
impl TailLatencyBudgetVerdict {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Pass => "pass",
Self::NoWin => "no_win",
Self::FailClosed => "fail_closed",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyBudgetQuantiles {
pub p50_latency_ns: u64,
pub p95_latency_ns: u64,
pub p99_latency_ns: u64,
pub p999_latency_ns: u64,
}
impl TailLatencyBudgetQuantiles {
#[must_use]
pub const fn new(
p50_latency_ns: u64,
p95_latency_ns: u64,
p99_latency_ns: u64,
p999_latency_ns: u64,
) -> Self {
Self {
p50_latency_ns,
p95_latency_ns,
p99_latency_ns,
p999_latency_ns,
}
}
fn ordered(self) -> bool {
self.p50_latency_ns <= self.p95_latency_ns
&& self.p95_latency_ns <= self.p99_latency_ns
&& self.p99_latency_ns <= self.p999_latency_ns
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyBudgetUncertainty {
pub lower_bound_ns: u64,
pub upper_bound_ns: u64,
}
impl TailLatencyBudgetUncertainty {
#[must_use]
pub const fn new(lower_bound_ns: u64, upper_bound_ns: u64) -> Self {
Self {
lower_bound_ns,
upper_bound_ns,
}
}
fn valid(self) -> bool {
self.lower_bound_ns <= self.upper_bound_ns
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyBudgetGate {
pub budget_p999_latency_ns: u64,
pub min_sample_count: u64,
pub max_unknown_residual_ns: u64,
pub max_unknown_residual_basis_points: u64,
pub calibration_valid_until_epoch: u64,
pub allowed_p999_regression_ns: u64,
}
impl TailLatencyBudgetGate {
#[must_use]
pub const fn new(
budget_p999_latency_ns: u64,
min_sample_count: u64,
max_unknown_residual_ns: u64,
max_unknown_residual_basis_points: u64,
calibration_valid_until_epoch: u64,
allowed_p999_regression_ns: u64,
) -> Self {
Self {
budget_p999_latency_ns,
min_sample_count,
max_unknown_residual_ns,
max_unknown_residual_basis_points,
calibration_valid_until_epoch,
allowed_p999_regression_ns,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TailLatencyBudgetEvidence {
pub certificate_id: String,
pub scenario_id: String,
pub candidate_id: String,
pub fallback_profile: String,
pub replay_command: String,
pub sample_count: u64,
pub evidence_epoch: u64,
pub quantiles: Option<TailLatencyBudgetQuantiles>,
pub uncertainty: Option<TailLatencyBudgetUncertainty>,
pub baseline_p999_latency_ns: u64,
pub candidate_p999_latency_ns: u64,
pub gate: TailLatencyBudgetGate,
pub tail_events: Vec<TailLatencyCompactEvent>,
}
impl TailLatencyBudgetEvidence {
#[must_use]
pub fn new(
certificate_id: impl Into<String>,
scenario_id: impl Into<String>,
candidate_id: impl Into<String>,
fallback_profile: impl Into<String>,
replay_command: impl Into<String>,
gate: TailLatencyBudgetGate,
) -> Self {
Self {
certificate_id: certificate_id.into(),
scenario_id: scenario_id.into(),
candidate_id: candidate_id.into(),
fallback_profile: fallback_profile.into(),
replay_command: replay_command.into(),
sample_count: 0,
evidence_epoch: 0,
quantiles: None,
uncertainty: None,
baseline_p999_latency_ns: 0,
candidate_p999_latency_ns: 0,
gate,
tail_events: Vec::new(),
}
}
#[must_use]
pub fn with_quantiles(mut self, quantiles: TailLatencyBudgetQuantiles) -> Self {
self.quantiles = Some(quantiles);
self
}
#[must_use]
pub fn with_uncertainty(mut self, uncertainty: TailLatencyBudgetUncertainty) -> Self {
self.uncertainty = Some(uncertainty);
self
}
#[must_use]
pub fn with_sample_window(mut self, sample_count: u64, evidence_epoch: u64) -> Self {
self.sample_count = sample_count;
self.evidence_epoch = evidence_epoch;
self
}
#[must_use]
pub fn with_regression_window(
mut self,
baseline_p999_latency_ns: u64,
candidate_p999_latency_ns: u64,
) -> Self {
self.baseline_p999_latency_ns = baseline_p999_latency_ns;
self.candidate_p999_latency_ns = candidate_p999_latency_ns;
self
}
#[must_use]
pub fn with_tail_events(mut self, tail_events: Vec<TailLatencyCompactEvent>) -> Self {
self.tail_events = tail_events;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyBudgetTermEvidence {
pub term_id: String,
pub field_key: String,
pub measurement_class: String,
pub attribution_state: String,
pub observed_value: u64,
pub direct_duration_ns: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TailLatencyBudgetCertificate {
pub schema_version: String,
pub certificate_id: String,
pub certificate_hash: String,
pub scenario_id: String,
pub candidate_id: String,
pub fallback_profile: String,
pub taxonomy_version: String,
pub verdict: TailLatencyBudgetVerdict,
pub reason_codes: Vec<String>,
pub fallback_reason: Option<String>,
pub sample_count: u64,
pub p50_latency_ns: Option<u64>,
pub p95_latency_ns: Option<u64>,
pub p99_latency_ns: Option<u64>,
pub p999_latency_ns: Option<u64>,
pub uncertainty_lower_bound_ns: Option<u64>,
pub uncertainty_upper_bound_ns: Option<u64>,
pub budget_p999_latency_ns: u64,
pub unknown_residual_ns: u64,
pub unknown_residual_basis_points: u64,
pub term_breakdown: Vec<TailLatencyBudgetTermEvidence>,
pub replay_command: String,
}
fn tail_field_unsigned(event: &TailLatencyCompactEvent, key: &str) -> Option<u64> {
match event.fields.get(key) {
Some(TailLatencyFieldValue::Unsigned(value)) => Some(*value),
_ => None,
}
}
fn tail_term_field_key(term_id: &str) -> &'static str {
match term_id {
"queueing" => "tail.queueing.ready_queue_depth",
"service" => "tail.service.poll_count",
"io_or_network" => "tail.io_or_network.events_received",
"retries" => "tail.retries.total_delay_ns",
"synchronization" => "tail.synchronization.lock_wait_ns",
"allocator_or_cache" => "tail.allocator_or_cache.live_allocations",
"unknown" => "tail.unknown.unmeasured_ns",
_ => "tail.unknown.unmeasured_ns",
}
}
fn tail_term_measurement_class(term_id: &str) -> &'static str {
match term_id {
"retries" | "synchronization" => "direct_duration",
"unknown" => "unknown_bucket",
_ => "proxy_signal",
}
}
fn unique_push_reason(reasons: &mut Vec<String>, reason: impl Into<String>) {
let reason = reason.into();
if !reasons.contains(&reason) {
reasons.push(reason);
}
}
fn stable_fnv1a64_hex(bytes: &[u8]) -> String {
const OFFSET: u64 = 0xcbf29ce484222325;
const PRIME: u64 = 0x100000001b3;
let mut hash = OFFSET;
for byte in bytes {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(PRIME);
}
format!("fnv1a64:{hash:016x}")
}
fn tail_latency_budget_certificate_hash(certificate: &TailLatencyBudgetCertificate) -> String {
let mut projection = String::new();
projection.push_str(&certificate.schema_version);
projection.push('|');
projection.push_str(&certificate.certificate_id);
projection.push('|');
projection.push_str(&certificate.scenario_id);
projection.push('|');
projection.push_str(&certificate.candidate_id);
projection.push('|');
projection.push_str(certificate.verdict.as_str());
projection.push('|');
projection.push_str(&certificate.sample_count.to_string());
projection.push('|');
projection.push_str(&certificate.unknown_residual_ns.to_string());
projection.push('|');
projection.push_str(&certificate.unknown_residual_basis_points.to_string());
for reason in &certificate.reason_codes {
projection.push('|');
projection.push_str(reason);
}
for term in &certificate.term_breakdown {
projection.push('|');
projection.push_str(&term.term_id);
projection.push('=');
projection.push_str(&term.measurement_class);
projection.push(':');
projection.push_str(&term.attribution_state);
projection.push(':');
projection.push_str(&term.observed_value.to_string());
projection.push(':');
projection.push_str(&term.direct_duration_ns.to_string());
}
stable_fnv1a64_hex(projection.as_bytes())
}
#[must_use]
pub fn verify_tail_latency_budget_certificate(
evidence: TailLatencyBudgetEvidence,
) -> TailLatencyBudgetCertificate {
let contract = tail_latency_taxonomy_contract();
let mut fail_reasons = Vec::new();
let mut no_win_reasons = Vec::new();
let mut direct_duration_seen = false;
let mut unknown_residual_ns = 0_u64;
let mut total_latency_ns = 0_u64;
if evidence.certificate_id.trim().is_empty() {
unique_push_reason(&mut fail_reasons, "empty_certificate_id");
}
if evidence.scenario_id.trim().is_empty() {
unique_push_reason(&mut fail_reasons, "empty_scenario_id");
}
if evidence.candidate_id.trim().is_empty() {
unique_push_reason(&mut fail_reasons, "empty_candidate_id");
}
if evidence.fallback_profile.trim().is_empty() {
unique_push_reason(&mut fail_reasons, "empty_fallback_profile");
}
if evidence.replay_command.trim().is_empty() {
unique_push_reason(&mut fail_reasons, "missing_replay_command");
}
if evidence.tail_events.is_empty() {
unique_push_reason(&mut fail_reasons, "missing_tail_events");
}
if evidence.sample_count < evidence.gate.min_sample_count {
unique_push_reason(&mut fail_reasons, "insufficient_sample_count");
}
if evidence.evidence_epoch > evidence.gate.calibration_valid_until_epoch {
unique_push_reason(&mut fail_reasons, "stale_calibration");
}
let quantiles = evidence.quantiles;
if let Some(quantiles) = quantiles {
if !quantiles.ordered() {
unique_push_reason(&mut fail_reasons, "unordered_quantiles");
}
} else {
unique_push_reason(&mut fail_reasons, "missing_quantiles_mean_only_evidence");
}
let uncertainty = evidence.uncertainty;
if let Some(uncertainty) = uncertainty {
if !uncertainty.valid() {
unique_push_reason(&mut fail_reasons, "invalid_uncertainty_bounds");
}
} else {
unique_push_reason(&mut fail_reasons, "missing_uncertainty_bounds");
}
let term_ids: Vec<&str> = contract
.terms
.iter()
.map(|term| term.term_id.as_str())
.collect();
let required_keys: Vec<&str> = contract
.required_log_fields
.iter()
.map(|field| field.key.as_str())
.collect();
let mut term_breakdown = Vec::with_capacity(term_ids.len());
for event in &evidence.tail_events {
if event.schema_version != TAIL_LATENCY_COMPACT_EVENT_SCHEMA_VERSION {
unique_push_reason(&mut fail_reasons, "wrong_tail_event_schema");
}
if event.taxonomy_version != TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION {
unique_push_reason(&mut fail_reasons, "wrong_tail_taxonomy_version");
}
for key in &required_keys {
if !event.fields.contains_key(*key) {
unique_push_reason(&mut fail_reasons, format!("missing_required_field:{key}"));
}
}
for term_id in &term_ids {
if !event.attribution_states.contains_key(*term_id) {
unique_push_reason(&mut fail_reasons, format!("missing_term:{term_id}"));
}
}
let event_total = tail_field_unsigned(event, "tail.total_latency_ns").unwrap_or(0);
total_latency_ns = total_latency_ns.saturating_add(event_total);
let unknown_field = tail_field_unsigned(event, "tail.unknown.unmeasured_ns");
if unknown_field != Some(event.unknown_unmeasured_ns) {
unique_push_reason(&mut fail_reasons, "hidden_unknown_bucket");
}
if !event.missing_producers.is_empty() && event.unknown_unmeasured_ns == 0 {
unique_push_reason(&mut fail_reasons, "hidden_unknown_bucket");
}
unknown_residual_ns = unknown_residual_ns.saturating_add(event.unknown_unmeasured_ns);
let retries_direct = tail_field_unsigned(event, "tail.retries.total_delay_ns").unwrap_or(0);
let sync_direct =
tail_field_unsigned(event, "tail.synchronization.lock_wait_ns").unwrap_or(0);
direct_duration_seen |= retries_direct > 0 || sync_direct > 0;
}
if !evidence.tail_events.is_empty() && !direct_duration_seen {
unique_push_reason(&mut fail_reasons, "proxy_only_green_row");
}
for term_id in &term_ids {
let field_key = tail_term_field_key(term_id);
let measurement_class = tail_term_measurement_class(term_id);
let mut observed_value = 0_u64;
let mut direct_duration_ns = 0_u64;
let mut state = "absent";
let mut missing = false;
for event in &evidence.tail_events {
observed_value =
observed_value.saturating_add(tail_field_unsigned(event, field_key).unwrap_or(0));
if measurement_class == "direct_duration" {
direct_duration_ns = direct_duration_ns
.saturating_add(tail_field_unsigned(event, field_key).unwrap_or(0));
}
match event.attribution_states.get(*term_id).map(String::as_str) {
Some("missing_producer") => missing = true,
Some(current) if state == "absent" => state = current,
Some(current) if state != current => state = "mixed",
_ => {}
}
}
if missing {
state = "missing_producer";
}
term_breakdown.push(TailLatencyBudgetTermEvidence {
term_id: (*term_id).to_string(),
field_key: field_key.to_string(),
measurement_class: measurement_class.to_string(),
attribution_state: state.to_string(),
observed_value,
direct_duration_ns,
});
}
let unknown_residual_basis_points = if total_latency_ns == 0 {
0
} else {
unknown_residual_ns
.saturating_mul(10_000)
.saturating_div(total_latency_ns)
};
if unknown_residual_ns > evidence.gate.max_unknown_residual_ns {
unique_push_reason(&mut no_win_reasons, "unknown_residual_above_limit");
}
if unknown_residual_basis_points > evidence.gate.max_unknown_residual_basis_points {
unique_push_reason(&mut no_win_reasons, "unknown_fraction_above_limit");
}
if let (Some(quantiles), Some(uncertainty)) = (quantiles, uncertainty) {
let conservative_p999 = quantiles
.p999_latency_ns
.saturating_add(uncertainty.upper_bound_ns);
if conservative_p999 > evidence.gate.budget_p999_latency_ns {
unique_push_reason(&mut no_win_reasons, "p999_budget_exceeded");
}
let allowed_candidate_p999 = evidence
.baseline_p999_latency_ns
.saturating_add(evidence.gate.allowed_p999_regression_ns);
if evidence.candidate_p999_latency_ns > allowed_candidate_p999 {
unique_push_reason(&mut no_win_reasons, "asymmetric_regression_gate");
}
}
let (verdict, reason_codes) = if fail_reasons.is_empty() {
if no_win_reasons.is_empty() {
(TailLatencyBudgetVerdict::Pass, Vec::new())
} else {
(TailLatencyBudgetVerdict::NoWin, no_win_reasons)
}
} else {
(TailLatencyBudgetVerdict::FailClosed, fail_reasons)
};
let fallback_reason = reason_codes.first().cloned();
let mut certificate = TailLatencyBudgetCertificate {
schema_version: TAIL_LATENCY_BUDGET_CERTIFICATE_SCHEMA_VERSION.to_string(),
certificate_id: evidence.certificate_id.trim().to_string(),
certificate_hash: String::new(),
scenario_id: evidence.scenario_id.trim().to_string(),
candidate_id: evidence.candidate_id.trim().to_string(),
fallback_profile: evidence.fallback_profile.trim().to_string(),
taxonomy_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
verdict,
reason_codes,
fallback_reason,
sample_count: evidence.sample_count,
p50_latency_ns: quantiles.map(|value| value.p50_latency_ns),
p95_latency_ns: quantiles.map(|value| value.p95_latency_ns),
p99_latency_ns: quantiles.map(|value| value.p99_latency_ns),
p999_latency_ns: quantiles.map(|value| value.p999_latency_ns),
uncertainty_lower_bound_ns: uncertainty.map(|value| value.lower_bound_ns),
uncertainty_upper_bound_ns: uncertainty.map(|value| value.upper_bound_ns),
budget_p999_latency_ns: evidence.gate.budget_p999_latency_ns,
unknown_residual_ns,
unknown_residual_basis_points,
term_breakdown,
replay_command: evidence.replay_command,
};
certificate.certificate_hash = tail_latency_budget_certificate_hash(&certificate);
certificate
}
pub const WAIT_CAUSE_REMEDIATION_REPORT_SCHEMA_VERSION: &str =
"runtime-wait-cause-remediation-report-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WaitCauseRemediationVerdict {
Actionable,
Investigate,
Refused,
}
impl WaitCauseRemediationVerdict {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Actionable => "actionable",
Self::Investigate => "investigate",
Self::Refused => "refused",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WaitCauseCategory {
DeadlockCycle,
Futurelock,
ObligationLeak,
UnknownWait,
}
impl WaitCauseCategory {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::DeadlockCycle => "deadlock_cycle",
Self::Futurelock => "futurelock",
Self::ObligationLeak => "obligation_leak",
Self::UnknownWait => "unknown_wait",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WaitCauseSeverity {
Critical,
High,
Warning,
Info,
}
impl WaitCauseSeverity {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Critical => "critical",
Self::High => "high",
Self::Warning => "warning",
Self::Info => "info",
}
}
const fn sort_rank(self) -> u8 {
match self {
Self::Critical => 4,
Self::High => 3,
Self::Warning => 2,
Self::Info => 1,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitCauseTaskWaitKind {
AwaitingFuture,
AwaitingSchedule,
Unknown,
}
impl WaitCauseTaskWaitKind {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::AwaitingFuture => "awaiting_future",
Self::AwaitingSchedule => "awaiting_schedule",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WaitCauseTaskEvidence {
pub task_id: TaskId,
pub region_id: Option<RegionId>,
pub wait_kind: WaitCauseTaskWaitKind,
pub blocked_resource: String,
pub wait_age_ns: u64,
pub wake_pending: bool,
pub details: Vec<String>,
}
impl WaitCauseTaskEvidence {
#[must_use]
pub fn new(
task_id: TaskId,
region_id: Option<RegionId>,
wait_kind: WaitCauseTaskWaitKind,
blocked_resource: impl Into<String>,
) -> Self {
Self {
task_id,
region_id,
wait_kind,
blocked_resource: blocked_resource.into(),
wait_age_ns: 0,
wake_pending: false,
details: Vec::new(),
}
}
#[must_use]
pub fn from_task_blocked(
explanation: &TaskBlockedExplanation,
region_id: Option<RegionId>,
wait_age_ns: u64,
wake_pending: bool,
) -> Self {
let wait_kind = match &explanation.block_reason {
BlockReason::AwaitingFuture { .. } => WaitCauseTaskWaitKind::AwaitingFuture,
BlockReason::AwaitingSchedule => WaitCauseTaskWaitKind::AwaitingSchedule,
_ => WaitCauseTaskWaitKind::Unknown,
};
let blocked_resource = sanitize_remediation_text(&explanation.block_reason.to_string());
Self {
task_id: explanation.task_id,
region_id,
wait_kind,
blocked_resource,
wait_age_ns,
wake_pending,
details: explanation
.details
.iter()
.map(|detail| sanitize_remediation_text(detail))
.collect(),
}
}
#[must_use]
pub const fn with_wait_age_ns(mut self, wait_age_ns: u64) -> Self {
self.wait_age_ns = wait_age_ns;
self
}
#[must_use]
pub const fn with_wake_pending(mut self, wake_pending: bool) -> Self {
self.wake_pending = wake_pending;
self
}
#[must_use]
pub fn with_details(mut self, details: Vec<String>) -> Self {
self.details = details
.into_iter()
.map(|detail| sanitize_remediation_text(&detail))
.collect();
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WaitCauseObligationEvidence {
pub obligation_id: ObligationId,
pub obligation_type: String,
pub holder_task: Option<TaskId>,
pub region_id: RegionId,
pub age_ns: u64,
}
impl WaitCauseObligationEvidence {
#[must_use]
pub fn new(
obligation_id: ObligationId,
obligation_type: impl Into<String>,
holder_task: Option<TaskId>,
region_id: RegionId,
age_ns: u64,
) -> Self {
Self {
obligation_id,
obligation_type: sanitize_remediation_text(&obligation_type.into()),
holder_task,
region_id,
age_ns,
}
}
#[must_use]
pub fn from_obligation_leak(leak: &ObligationLeak) -> Self {
let nanos = u64::try_from(leak.age.as_nanos()).unwrap_or(u64::MAX);
Self::new(
leak.obligation_id,
leak.obligation_type.clone(),
leak.holder_task,
leak.region_id,
nanos,
)
}
}
#[derive(Debug, Clone)]
pub struct WaitCauseRemediationEvidence {
pub report_id: String,
pub scenario_id: String,
pub replay_command: String,
pub tail_taxonomy_version: String,
pub deadlock_report: Option<DirectionalDeadlockReport>,
pub task_waits: Vec<WaitCauseTaskEvidence>,
pub obligation_leaks: Vec<WaitCauseObligationEvidence>,
pub evidence_refs: Vec<String>,
}
impl WaitCauseRemediationEvidence {
#[must_use]
pub fn new(
report_id: impl Into<String>,
scenario_id: impl Into<String>,
replay_command: impl Into<String>,
) -> Self {
Self {
report_id: report_id.into(),
scenario_id: scenario_id.into(),
replay_command: replay_command.into(),
tail_taxonomy_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
deadlock_report: None,
task_waits: Vec::new(),
obligation_leaks: Vec::new(),
evidence_refs: Vec::new(),
}
}
#[must_use]
pub fn with_tail_taxonomy_version(mut self, version: impl Into<String>) -> Self {
self.tail_taxonomy_version = version.into();
self
}
#[must_use]
pub fn with_deadlock_report(mut self, report: DirectionalDeadlockReport) -> Self {
self.deadlock_report = Some(report);
self
}
#[must_use]
pub fn with_task_waits(mut self, waits: Vec<WaitCauseTaskEvidence>) -> Self {
self.task_waits = waits;
self
}
#[must_use]
pub fn with_obligation_leaks(mut self, leaks: Vec<WaitCauseObligationEvidence>) -> Self {
self.obligation_leaks = leaks;
self
}
#[must_use]
pub fn with_evidence_refs(mut self, refs: Vec<String>) -> Self {
self.evidence_refs = refs
.into_iter()
.map(|reference| sanitize_cancel_message(&reference))
.collect();
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WaitCauseRemediationFinding {
pub finding_id: String,
pub rank: usize,
pub category: WaitCauseCategory,
pub severity: WaitCauseSeverity,
pub confidence_basis_points: u16,
pub reason_code: String,
pub summary: String,
pub blocked_resource: String,
pub owner_task_id: Option<String>,
pub owner_region_id: Option<String>,
pub evidence_refs: Vec<String>,
pub safe_actions: Vec<String>,
pub forbidden_actions: Vec<String>,
pub replay_command: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WaitCauseRemediationReport {
pub schema_version: String,
pub report_id: String,
pub report_hash: String,
pub scenario_id: String,
pub wait_cause_graph_hash: String,
pub tail_taxonomy_version: String,
pub verdict: WaitCauseRemediationVerdict,
pub refusal_reason: Option<String>,
pub findings: Vec<WaitCauseRemediationFinding>,
pub safe_actions: Vec<String>,
pub forbidden_action_disclaimer: String,
pub replay_command: String,
pub evidence_refs: Vec<String>,
}
fn sanitize_remediation_text(input: &str) -> String {
let sanitized = sanitize_cancel_message(input);
let mut tokens = Vec::new();
for token in sanitized.split_whitespace() {
if token.contains('/') || token.contains('\\') {
tokens.push("[redacted-path]".to_string());
} else if token.contains('@') {
tokens.push("[redacted-identity]".to_string());
} else {
tokens.push(token.to_string());
}
}
if tokens.is_empty() {
String::new()
} else {
tokens.join(" ")
}
}
fn remediation_forbidden_actions() -> Vec<String> {
vec![
"Do not delete files, artifacts, or trace evidence while diagnosing the wait.".to_string(),
"Do not reset git state or discard unrelated work to clear a stall.".to_string(),
"Do not kill unknown tasks or processes until ownership and replay evidence are recorded."
.to_string(),
]
}
fn deadlock_safe_actions(trapped: bool) -> Vec<String> {
let mut actions = vec![
"Capture the wait-graph artifact before changing runtime state.".to_string(),
"Replay and minimize the cycle before widening the investigation.".to_string(),
];
if trapped {
actions.push(
"Cancel the smallest non-critical owning region only after drain ownership is known."
.to_string(),
);
} else {
actions.push("Inspect the egress edge before forcing cancellation.".to_string());
}
actions
}
fn futurelock_safe_actions() -> Vec<String> {
vec![
"Inspect the awaited producer and wake path for the blocked task.".to_string(),
"Use the replay command before adding timeouts or policy changes.".to_string(),
"Cancel the owning region only if the task is non-critical and drain evidence is recorded."
.to_string(),
]
}
fn obligation_safe_actions() -> Vec<String> {
vec![
"Resolve the named obligation through its owning protocol commit or abort path."
.to_string(),
"Inspect the holder task before closing the region.".to_string(),
"Verify drain and finalize paths clear the obligation in replay.".to_string(),
]
}
fn unknown_wait_safe_actions() -> Vec<String> {
vec![
"Collect a fresh diagnostics snapshot with task, region, and obligation rows.".to_string(),
"Preserve replay artifacts before attempting remediation.".to_string(),
"Escalate to focused instrumentation instead of guessing at destructive cleanup."
.to_string(),
]
}
fn wait_cause_graph_hash(evidence: &WaitCauseRemediationEvidence) -> String {
let mut projection = String::new();
projection.push_str(&evidence.report_id);
projection.push('|');
projection.push_str(&evidence.scenario_id);
projection.push('|');
projection.push_str(&evidence.tail_taxonomy_version);
if let Some(report) = &evidence.deadlock_report {
projection.push_str("|deadlock:");
projection.push_str(report.severity.as_str());
projection.push(':');
projection.push_str(&format!("{:.6}", report.risk_score));
for cycle in &report.cycles {
projection.push('|');
projection.push_str(if cycle.trapped { "trapped" } else { "egress" });
projection.push(':');
projection.push_str(&cycle.ingress_edges.to_string());
projection.push(':');
projection.push_str(&cycle.egress_edges.to_string());
for task in &cycle.tasks {
projection.push(':');
projection.push_str(&format!("{task:?}"));
}
}
}
for wait in &evidence.task_waits {
projection.push_str("|task:");
projection.push_str(&format!("{:?}", wait.task_id));
projection.push(':');
projection.push_str(wait.wait_kind.as_str());
projection.push(':');
projection.push_str(&wait.wait_age_ns.to_string());
projection.push(':');
projection.push_str(if wait.wake_pending { "wake" } else { "no_wake" });
projection.push(':');
projection.push_str(&sanitize_remediation_text(&wait.blocked_resource));
}
for leak in &evidence.obligation_leaks {
projection.push_str("|obligation:");
projection.push_str(&format!("{:?}", leak.obligation_id));
projection.push(':');
projection.push_str(&sanitize_remediation_text(&leak.obligation_type));
projection.push(':');
projection.push_str(&format!("{:?}", leak.region_id));
projection.push(':');
projection.push_str(&leak.age_ns.to_string());
}
stable_fnv1a64_hex(projection.as_bytes())
}
impl DeadlockSeverity {
fn as_str(self) -> &'static str {
match self {
Self::None => "none",
Self::Elevated => "elevated",
Self::Critical => "critical",
}
}
}
fn push_deadlock_findings(
findings: &mut Vec<WaitCauseRemediationFinding>,
report: &DirectionalDeadlockReport,
evidence: &WaitCauseRemediationEvidence,
) {
for cycle in &report.cycles {
let severity = if cycle.trapped {
WaitCauseSeverity::Critical
} else {
WaitCauseSeverity::High
};
let confidence_basis_points = if cycle.trapped { 9_600 } else { 8_100 };
let reason_code = if cycle.trapped {
"trapped_wait_cycle"
} else {
"cyclic_wait_with_egress"
};
let blocked_resource = sanitize_remediation_text(&format!(
"wait_graph_cycle:{}:{}",
if cycle.trapped { "trapped" } else { "egress" },
cycle
.tasks
.iter()
.map(|task| format!("{task:?}"))
.collect::<Vec<_>>()
.join("->")
));
findings.push(WaitCauseRemediationFinding {
finding_id: String::new(),
rank: 0,
category: WaitCauseCategory::DeadlockCycle,
severity,
confidence_basis_points,
reason_code: reason_code.to_string(),
summary: format!(
"{} wait-for cycle across {} task(s)",
if cycle.trapped {
"trapped"
} else {
"egress-capable"
},
cycle.tasks.len()
),
blocked_resource,
owner_task_id: cycle.tasks.first().map(|task| format!("{task:?}")),
owner_region_id: None,
evidence_refs: evidence.evidence_refs.clone(),
safe_actions: deadlock_safe_actions(cycle.trapped),
forbidden_actions: remediation_forbidden_actions(),
replay_command: evidence.replay_command.clone(),
});
}
}
fn push_task_wait_findings(
findings: &mut Vec<WaitCauseRemediationFinding>,
evidence: &WaitCauseRemediationEvidence,
) {
for wait in &evidence.task_waits {
let (category, severity, confidence_basis_points, reason_code, safe_actions) =
match (wait.wait_kind, wait.wake_pending) {
(WaitCauseTaskWaitKind::AwaitingFuture, false) => (
WaitCauseCategory::Futurelock,
WaitCauseSeverity::High,
8_200,
"future_wait_without_pending_wake",
futurelock_safe_actions(),
),
(WaitCauseTaskWaitKind::AwaitingFuture, true) => (
WaitCauseCategory::UnknownWait,
WaitCauseSeverity::Warning,
5_200,
"future_wait_has_pending_wake",
unknown_wait_safe_actions(),
),
(WaitCauseTaskWaitKind::AwaitingSchedule, _) => (
WaitCauseCategory::UnknownWait,
WaitCauseSeverity::Info,
4_200,
"awaiting_scheduler_turn",
unknown_wait_safe_actions(),
),
(WaitCauseTaskWaitKind::Unknown, _) => (
WaitCauseCategory::UnknownWait,
WaitCauseSeverity::Warning,
4_800,
"unknown_wait_cause",
unknown_wait_safe_actions(),
),
};
let blocked_resource = sanitize_remediation_text(&wait.blocked_resource);
findings.push(WaitCauseRemediationFinding {
finding_id: String::new(),
rank: 0,
category,
severity,
confidence_basis_points,
reason_code: reason_code.to_string(),
summary: format!(
"task {:?} wait classified as {} for {}ns",
wait.task_id,
category.as_str(),
wait.wait_age_ns
),
blocked_resource: if blocked_resource.is_empty() {
"unknown_resource".to_string()
} else {
blocked_resource
},
owner_task_id: Some(format!("{:?}", wait.task_id)),
owner_region_id: wait.region_id.map(|region| format!("{region:?}")),
evidence_refs: evidence.evidence_refs.clone(),
safe_actions,
forbidden_actions: remediation_forbidden_actions(),
replay_command: evidence.replay_command.clone(),
});
}
}
fn push_obligation_findings(
findings: &mut Vec<WaitCauseRemediationFinding>,
evidence: &WaitCauseRemediationEvidence,
) {
for leak in &evidence.obligation_leaks {
let obligation_type = sanitize_remediation_text(&leak.obligation_type);
findings.push(WaitCauseRemediationFinding {
finding_id: String::new(),
rank: 0,
category: WaitCauseCategory::ObligationLeak,
severity: WaitCauseSeverity::Critical,
confidence_basis_points: 9_000,
reason_code: "reserved_obligation_still_held".to_string(),
summary: format!(
"obligation {:?} ({}) held for {}ns",
leak.obligation_id, obligation_type, leak.age_ns
),
blocked_resource: format!("obligation:{:?}", leak.obligation_id),
owner_task_id: leak.holder_task.map(|task| format!("{task:?}")),
owner_region_id: Some(format!("{:?}", leak.region_id)),
evidence_refs: evidence.evidence_refs.clone(),
safe_actions: obligation_safe_actions(),
forbidden_actions: remediation_forbidden_actions(),
replay_command: evidence.replay_command.clone(),
});
}
}
fn rank_wait_cause_findings(findings: &mut [WaitCauseRemediationFinding]) {
findings.sort_by(|a, b| {
b.severity
.sort_rank()
.cmp(&a.severity.sort_rank())
.then_with(|| b.confidence_basis_points.cmp(&a.confidence_basis_points))
.then_with(|| a.category.cmp(&b.category))
.then_with(|| a.blocked_resource.cmp(&b.blocked_resource))
.then_with(|| a.owner_task_id.cmp(&b.owner_task_id))
});
for (idx, finding) in findings.iter_mut().enumerate() {
finding.rank = idx + 1;
finding.finding_id = format!("wait-cause-remediation-{:04}", idx + 1);
}
}
fn wait_cause_report_hash(report: &WaitCauseRemediationReport) -> String {
let mut projection = String::new();
projection.push_str(&report.schema_version);
projection.push('|');
projection.push_str(&report.report_id);
projection.push('|');
projection.push_str(&report.scenario_id);
projection.push('|');
projection.push_str(report.verdict.as_str());
projection.push('|');
projection.push_str(&report.wait_cause_graph_hash);
if let Some(reason) = &report.refusal_reason {
projection.push('|');
projection.push_str(reason);
}
for finding in &report.findings {
projection.push('|');
projection.push_str(&finding.finding_id);
projection.push(':');
projection.push_str(finding.category.as_str());
projection.push(':');
projection.push_str(finding.severity.as_str());
projection.push(':');
projection.push_str(&finding.confidence_basis_points.to_string());
projection.push(':');
projection.push_str(&finding.reason_code);
}
stable_fnv1a64_hex(projection.as_bytes())
}
fn flattened_safe_actions(findings: &[WaitCauseRemediationFinding]) -> Vec<String> {
let mut actions = Vec::new();
for finding in findings {
for action in &finding.safe_actions {
if !actions.contains(action) {
actions.push(action.clone());
}
}
}
actions
}
#[must_use]
pub fn build_wait_cause_remediation_report(
evidence: WaitCauseRemediationEvidence,
) -> WaitCauseRemediationReport {
let mut refusal_reasons = Vec::new();
if evidence.report_id.trim().is_empty() {
unique_push_reason(&mut refusal_reasons, "empty_report_id");
}
if evidence.scenario_id.trim().is_empty() {
unique_push_reason(&mut refusal_reasons, "empty_scenario_id");
}
if evidence.replay_command.trim().is_empty() {
unique_push_reason(&mut refusal_reasons, "missing_replay_command");
}
if evidence.tail_taxonomy_version != TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION {
unique_push_reason(&mut refusal_reasons, "wrong_tail_taxonomy_version");
}
let graph_hash = wait_cause_graph_hash(&evidence);
let mut findings = Vec::new();
if let Some(report) = &evidence.deadlock_report {
push_deadlock_findings(&mut findings, report, &evidence);
}
push_task_wait_findings(&mut findings, &evidence);
push_obligation_findings(&mut findings, &evidence);
rank_wait_cause_findings(&mut findings);
if findings.is_empty() {
unique_push_reason(&mut refusal_reasons, "no_wait_cause_evidence");
}
let verdict = if refusal_reasons.is_empty() {
if findings
.iter()
.all(|finding| finding.category == WaitCauseCategory::UnknownWait)
{
WaitCauseRemediationVerdict::Investigate
} else {
WaitCauseRemediationVerdict::Actionable
}
} else {
findings.clear();
WaitCauseRemediationVerdict::Refused
};
let mut report = WaitCauseRemediationReport {
schema_version: WAIT_CAUSE_REMEDIATION_REPORT_SCHEMA_VERSION.to_string(),
report_id: evidence.report_id.trim().to_string(),
report_hash: String::new(),
scenario_id: evidence.scenario_id.trim().to_string(),
wait_cause_graph_hash: graph_hash,
tail_taxonomy_version: evidence.tail_taxonomy_version,
verdict,
refusal_reason: refusal_reasons.first().cloned(),
safe_actions: flattened_safe_actions(&findings),
findings,
forbidden_action_disclaimer:
"All remediation actions must preserve evidence and avoid destructive cleanup unless explicitly authorized."
.to_string(),
replay_command: evidence.replay_command,
evidence_refs: evidence.evidence_refs,
};
report.report_hash = wait_cause_report_hash(&report);
report
}
fn tail_latency_log_field(
key: &str,
unit: &str,
required: bool,
meaning: &str,
) -> TailLatencyLogFieldSpec {
TailLatencyLogFieldSpec {
key: key.to_string(),
unit: unit.to_string(),
required,
meaning: meaning.to_string(),
}
}
#[allow(clippy::too_many_arguments)]
fn tail_latency_signal(
signal_id: &str,
structured_log_key: &str,
unit: &str,
producer_kind: &str,
producer_symbol: &str,
producer_file: &str,
measurement_class: &str,
core: bool,
notes: &str,
) -> TailLatencySignalSpec {
TailLatencySignalSpec {
signal_id: signal_id.to_string(),
structured_log_key: structured_log_key.to_string(),
unit: unit.to_string(),
producer_kind: producer_kind.to_string(),
producer_symbol: producer_symbol.to_string(),
producer_file: producer_file.to_string(),
measurement_class: measurement_class.to_string(),
core,
notes: notes.to_string(),
}
}
fn queueing_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "queueing".to_string(),
description:
"Backlog before useful work begins, spanning ready queues, waiters, and drain queues."
.to_string(),
direct_duration_key: "tail.queueing.ns".to_string(),
attribution_state_key: "tail.queueing.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"queueing.ready_queue_depth",
"tail.queueing.ready_queue_depth",
"count",
"snapshot_field",
"asupersync::obligation::lyapunov::StateSnapshot::ready_queue_depth",
"src/obligation/lyapunov.rs",
"proxy_signal",
true,
"Canonical scheduler backlog proxy used by the three-lane decision contract.",
),
tail_latency_signal(
"queueing.draining_regions",
"tail.queueing.draining_regions",
"count",
"snapshot_field",
"asupersync::obligation::lyapunov::StateSnapshot::draining_regions",
"src/obligation/lyapunov.rs",
"proxy_signal",
true,
"Captures cancellation/finalizer drain backlog that elongates queueing tails.",
),
tail_latency_signal(
"queueing.bulkhead_queue_depth",
"tail.queueing.bulkhead_queue_depth",
"count",
"stats_struct",
"asupersync::combinator::bulkhead::BulkheadMetrics::queue_depth",
"src/combinator/bulkhead.rs",
"proxy_signal",
false,
"Extended queueing proxy for admission-controlled bulkhead lanes.",
),
tail_latency_signal(
"queueing.pool_waiters",
"tail.queueing.pool_waiters",
"count",
"stats_struct",
"asupersync::sync::pool::PoolStats::waiters",
"src/sync/pool.rs",
"proxy_signal",
false,
"Extended backlog proxy for pool acquisition queues.",
),
],
}
}
fn service_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "service".to_string(),
description:
"CPU work once the task is scheduled, including poll consumption and budget burn."
.to_string(),
direct_duration_key: "tail.service.ns".to_string(),
attribution_state_key: "tail.service.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"service.poll_count",
"tail.service.poll_count",
"count",
"snapshot_field",
"asupersync::runtime::state::TaskSnapshot::poll_count",
"src/runtime/state.rs",
"proxy_signal",
true,
"Canonical always-on service proxy derived from task budget consumption.",
),
tail_latency_signal(
"service.poll_quota_consumed",
"tail.service.poll_quota_consumed",
"quota_units",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::poll_quota_consumed",
"src/observability/resource_accounting.rs",
"proxy_signal",
true,
"Aggregated service-pressure counter for runtime/test emitters.",
),
tail_latency_signal(
"service.cost_quota_consumed",
"tail.service.cost_quota_consumed",
"cost_units",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::cost_quota_consumed",
"src/observability/resource_accounting.rs",
"proxy_signal",
false,
"Extended service-pressure counter for cost-aware workloads.",
),
],
}
}
fn io_or_network_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "io_or_network".to_string(),
description: "Latency spent waiting on or draining reactor/network activity.".to_string(),
direct_duration_key: "tail.io_or_network.ns".to_string(),
attribution_state_key: "tail.io_or_network.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"io_or_network.events_received",
"tail.io_or_network.events_received",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::events_received",
"src/runtime/io_driver.rs",
"proxy_signal",
true,
"Canonical always-on I/O/network pressure proxy from the reactor driver.",
),
tail_latency_signal(
"io_or_network.polls",
"tail.io_or_network.polls",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::polls",
"src/runtime/io_driver.rs",
"proxy_signal",
false,
"Extended reactor activity proxy for sustained polling pressure.",
),
tail_latency_signal(
"io_or_network.wakers_dispatched",
"tail.io_or_network.wakers_dispatched",
"count",
"stats_struct",
"asupersync::runtime::io_driver::IoStats::wakers_dispatched",
"src/runtime/io_driver.rs",
"proxy_signal",
false,
"Extended proxy for wake fan-out caused by readiness events.",
),
],
}
}
fn retries_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "retries".to_string(),
description:
"Backoff and reattempt inflation introduced by retry/rate-limit/circuit-breaker control loops."
.to_string(),
direct_duration_key: "tail.retries.ns".to_string(),
attribution_state_key: "tail.retries.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"retries.total_delay_ns",
"tail.retries.total_delay_ns",
"ns",
"state_field",
"asupersync::combinator::retry::RetryState::total_delay",
"src/combinator/retry.rs",
"direct_duration",
true,
"Direct retry-delay contribution from the retry combinator.",
),
tail_latency_signal(
"retries.rate_limit_total_wait_ns",
"tail.retries.rate_limit_total_wait_ns",
"ns",
"stats_struct",
"asupersync::combinator::rate_limit::RateLimitMetrics::total_wait_time",
"src/combinator/rate_limit.rs",
"direct_duration",
false,
"Extended direct delay when token-bucket admission defers work.",
),
tail_latency_signal(
"retries.circuit_rejected_total",
"tail.retries.circuit_rejected_total",
"count",
"stats_struct",
"asupersync::combinator::circuit_breaker::CircuitBreakerMetrics::total_rejected",
"src/combinator/circuit_breaker.rs",
"proxy_signal",
false,
"Extended retry/control-loop pressure proxy when open circuits reject work.",
),
],
}
}
fn synchronization_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "synchronization".to_string(),
description:
"Coordination delay from locks, pools, obligations, and cancellation-aware rendezvous."
.to_string(),
direct_duration_key: "tail.synchronization.ns".to_string(),
attribution_state_key: "tail.synchronization.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"synchronization.lock_wait_ns",
"tail.synchronization.lock_wait_ns",
"ns",
"stats_struct",
"asupersync::sync::contended_mutex::LockMetricsSnapshot::wait_ns",
"src/sync/contended_mutex.rs",
"direct_duration",
true,
"Canonical direct synchronization delay from contention-instrumented locks.",
),
tail_latency_signal(
"synchronization.lock_hold_ns",
"tail.synchronization.lock_hold_ns",
"ns",
"stats_struct",
"asupersync::sync::contended_mutex::LockMetricsSnapshot::hold_ns",
"src/sync/contended_mutex.rs",
"proxy_signal",
false,
"Extended proxy for convoying and long critical sections.",
),
tail_latency_signal(
"synchronization.pool_total_wait_ns",
"tail.synchronization.pool_total_wait_ns",
"ns",
"stats_struct",
"asupersync::sync::pool::PoolStats::total_wait_time",
"src/sync/pool.rs",
"direct_duration",
false,
"Extended direct delay from resource-pool acquisition waits.",
),
tail_latency_signal(
"synchronization.obligations_pending",
"tail.synchronization.obligations_pending",
"count",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::obligations_pending",
"src/observability/resource_accounting.rs",
"proxy_signal",
true,
"Captures obligation/cancellation backlog that can extend synchronization tails.",
),
],
}
}
fn allocator_or_cache_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "allocator_or_cache".to_string(),
description:
"Allocator and cache-locality pressure observable from region-heap churn and memory high-water marks."
.to_string(),
direct_duration_key: "tail.allocator_or_cache.ns".to_string(),
attribution_state_key: "tail.allocator_or_cache.attribution_state".to_string(),
signals: vec![
tail_latency_signal(
"allocator_or_cache.live_allocations",
"tail.allocator_or_cache.live_allocations",
"count",
"stats_struct",
"asupersync::runtime::region_heap::HeapStats::live",
"src/runtime/region_heap.rs",
"proxy_signal",
true,
"Canonical allocator-pressure proxy from live region-heap allocations.",
),
tail_latency_signal(
"allocator_or_cache.bytes_live",
"tail.allocator_or_cache.bytes_live",
"bytes",
"stats_struct",
"asupersync::runtime::region_heap::HeapStats::bytes_live",
"src/runtime/region_heap.rs",
"proxy_signal",
false,
"Extended allocator-pressure proxy for live retained bytes.",
),
tail_latency_signal(
"allocator_or_cache.heap_bytes_peak",
"tail.allocator_or_cache.heap_bytes_peak",
"bytes",
"stats_struct",
"asupersync::observability::resource_accounting::ResourceAccountingSnapshot::heap_bytes_peak",
"src/observability/resource_accounting.rs",
"proxy_signal",
false,
"Extended region-level memory high-water mark for cache/allocator analysis.",
),
],
}
}
fn unknown_tail_latency_term() -> TailLatencyTermSpec {
TailLatencyTermSpec {
term_id: "unknown".to_string(),
description:
"Residual latency that remains unattributed after measured terms and proxies are accounted for."
.to_string(),
direct_duration_key: "tail.unknown.unmeasured_ns".to_string(),
attribution_state_key: "tail.unknown.attribution_state".to_string(),
signals: vec![tail_latency_signal(
"unknown.unmeasured_ns",
"tail.unknown.unmeasured_ns",
"ns",
"contract_field",
"asupersync::observability::diagnostics::tail_latency_taxonomy_contract",
"src/observability/diagnostics.rs",
"unknown_bucket",
true,
"Must be emitted whenever any term lacks direct attribution so latency does not disappear from evidence bundles.",
)],
}
}
#[must_use]
pub fn tail_latency_taxonomy_contract() -> TailLatencyTaxonomyContract {
TailLatencyTaxonomyContract {
contract_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
equation: "tail_latency_ns = queueing_ns + service_ns + io_or_network_ns + retries_ns + synchronization_ns + allocator_or_cache_ns + unknown_ns".to_string(),
total_latency_key: "tail.total_latency_ns".to_string(),
unknown_bucket_key: "tail.unknown.unmeasured_ns".to_string(),
required_log_fields: vec![
tail_latency_log_field(
"tail.contract_version",
"schema_id",
true,
"Versioned tail-latency taxonomy contract identifier.",
),
tail_latency_log_field(
"tail.total_latency_ns",
"ns",
true,
"Observed end-to-end tail latency for the operation under analysis.",
),
tail_latency_log_field(
"tail.queueing.ready_queue_depth",
"count",
true,
"Always-on queueing proxy based on runnable backlog.",
),
tail_latency_log_field(
"tail.service.poll_count",
"count",
true,
"Service-side work proxy based on task poll demand.",
),
tail_latency_log_field(
"tail.io_or_network.events_received",
"count",
true,
"I/O or network pressure proxy based on reactor event volume.",
),
tail_latency_log_field(
"tail.retries.total_delay_ns",
"ns",
true,
"Direct retry/backoff delay accumulated by retry combinators.",
),
tail_latency_log_field(
"tail.synchronization.lock_wait_ns",
"ns",
true,
"Direct synchronization delay from contention-instrumented locks.",
),
tail_latency_log_field(
"tail.allocator_or_cache.live_allocations",
"count",
true,
"Allocator/cache pressure proxy based on live region-heap allocations.",
),
tail_latency_log_field(
"tail.unknown.unmeasured_ns",
"ns",
true,
"Residual latency that remains unattributed after measured terms and proxies are recorded.",
),
],
terms: vec![
queueing_tail_latency_term(),
service_tail_latency_term(),
io_or_network_tail_latency_term(),
retries_tail_latency_term(),
synchronization_tail_latency_term(),
allocator_or_cache_tail_latency_term(),
unknown_tail_latency_term(),
],
sampling_policy: vec![
"Always emit the required core fields for any tail-latency event, even when extended observability sampling is disabled.".to_string(),
"Extended fields may be sampled or emitted only in replay/forensics modes, but they must retain the stable keys defined here.".to_string(),
"If a direct-duration field is unavailable for a term, preserve proxy signals and roll the residual duration into tail.unknown.unmeasured_ns.".to_string(),
],
compatibility_notes: vec![
"Structured-log keys are append-only within a contract version; removals or unit changes require a new contract version.".to_string(),
"Proxy signals are not interchangeable with direct-duration fields; emitters must preserve both semantics explicitly.".to_string(),
"Unknown contribution is mandatory whenever attribution is incomplete so downstream controllers never treat missing data as zero.".to_string(),
],
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BaselineLogEvent<'a> {
pub flow_id: &'a str,
pub event_kind: &'a str,
pub outcome_class: &'a str,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdvancedClassificationConflict {
FlowEventMismatch {
flow_id: String,
event_kind: String,
},
OutcomeEventMismatch {
event_kind: String,
outcome_class: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AdvancedLogClassification {
pub event_class: AdvancedEventClass,
pub severity: AdvancedSeverity,
pub dimensions: Vec<TroubleshootingDimension>,
pub narrative: String,
pub recommended_action: String,
pub conflicts: Vec<AdvancedClassificationConflict>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineFlowId {
Execution,
Integration,
Remediation,
Replay,
}
impl BaselineFlowId {
fn parse(raw: &str) -> Option<Self> {
match raw {
"execution" => Some(Self::Execution),
"integration" => Some(Self::Integration),
"remediation" => Some(Self::Remediation),
"replay" => Some(Self::Replay),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineEventKind {
CommandComplete,
CommandStart,
IntegrationError,
IntegrationSync,
RemediationApply,
RemediationVerify,
ReplayComplete,
ReplayStart,
VerificationSummary,
}
impl BaselineEventKind {
fn parse(raw: &str) -> Option<Self> {
match raw {
"command_complete" => Some(Self::CommandComplete),
"command_start" => Some(Self::CommandStart),
"integration_error" => Some(Self::IntegrationError),
"integration_sync" => Some(Self::IntegrationSync),
"remediation_apply" => Some(Self::RemediationApply),
"remediation_verify" => Some(Self::RemediationVerify),
"replay_complete" => Some(Self::ReplayComplete),
"replay_start" => Some(Self::ReplayStart),
"verification_summary" => Some(Self::VerificationSummary),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BaselineOutcomeClass {
Cancelled,
Failed,
Success,
}
impl BaselineOutcomeClass {
fn parse(raw: &str) -> Option<Self> {
match raw {
"cancelled" => Some(Self::Cancelled),
"failed" => Some(Self::Failed),
"success" => Some(Self::Success),
_ => None,
}
}
}
#[must_use]
pub fn advanced_observability_contract() -> AdvancedObservabilityContract {
AdvancedObservabilityContract {
contract_version: ADVANCED_OBSERVABILITY_CONTRACT_VERSION.to_string(),
baseline_contract_version: ADVANCED_OBSERVABILITY_BASELINE_VERSION.to_string(),
event_classes: vec![
AdvancedEventClassSpec {
class_id: AdvancedEventClass::CommandLifecycle.as_str().to_string(),
description: "Execution command lifecycle and gate telemetry.".to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::IntegrationReliability
.as_str()
.to_string(),
description: "Cross-system integration health and boundary reliability."
.to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::RemediationSafety.as_str().to_string(),
description: "Remediation safety, application, and post-fix verification."
.to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::ReplayDeterminism.as_str().to_string(),
description: "Replay lifecycle and deterministic reproducibility.".to_string(),
},
AdvancedEventClassSpec {
class_id: AdvancedEventClass::VerificationGovernance
.as_str()
.to_string(),
description: "Verification summary and governance gate posture.".to_string(),
},
],
severity_semantics: vec![
AdvancedSeveritySpec {
severity: AdvancedSeverity::Critical.as_str().to_string(),
meaning: "Contract/taxonomy contradiction requiring immediate correction."
.to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Error.as_str().to_string(),
meaning: "Actionable failure impacting reliability or correctness.".to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Info.as_str().to_string(),
meaning: "Expected state transition with no direct intervention required."
.to_string(),
},
AdvancedSeveritySpec {
severity: AdvancedSeverity::Warning.as_str().to_string(),
meaning: "Non-terminal issue or cancellation requiring review.".to_string(),
},
],
troubleshooting_dimensions: vec![
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::CancellationPath
.as_str()
.to_string(),
purpose: "Track request/drain/finalize behavior for cancelled runs.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::ContractCompliance
.as_str()
.to_string(),
purpose: "Validate schema, gate, and policy conformance.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::Determinism.as_str().to_string(),
purpose: "Confirm replay stability and deterministic artifact lineage.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::ExternalDependency
.as_str()
.to_string(),
purpose: "Isolate third-party/system boundary failures.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::OperatorAction
.as_str()
.to_string(),
purpose: "Prioritize immediate operator decision paths.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::RecoveryPlanning
.as_str()
.to_string(),
purpose: "Drive remediation and verify-after-change sequencing.".to_string(),
},
TroubleshootingDimensionSpec {
dimension: TroubleshootingDimension::RuntimeInvariant
.as_str()
.to_string(),
purpose: "Connect events to runtime invariant health.".to_string(),
},
],
compatibility_notes: vec![
"Additive dimensions/classes may be introduced without baseline schema changes."
.to_string(),
"Field removals or semantic redefinitions require a contract-version bump.".to_string(),
"Unknown baseline flow/event/outcome values are hard validation errors.".to_string(),
],
}
}
pub fn classify_baseline_log_event(
event: BaselineLogEvent<'_>,
) -> Result<AdvancedLogClassification, String> {
let flow = BaselineFlowId::parse(event.flow_id)
.ok_or_else(|| format!("unknown flow_id {}", event.flow_id))?;
let kind = BaselineEventKind::parse(event.event_kind)
.ok_or_else(|| format!("unknown event_kind {}", event.event_kind))?;
let outcome = BaselineOutcomeClass::parse(event.outcome_class)
.ok_or_else(|| format!("unknown outcome_class {}", event.outcome_class))?;
let (event_class, mut dimensions, kind_narrative, action_hint) = kind_semantics(kind);
let mut conflicts = Vec::new();
let mut severity = match outcome {
BaselineOutcomeClass::Success => AdvancedSeverity::Info,
BaselineOutcomeClass::Cancelled => AdvancedSeverity::Warning,
BaselineOutcomeClass::Failed => AdvancedSeverity::Error,
};
if !flow_allows_event(flow, kind) {
conflicts.push(AdvancedClassificationConflict::FlowEventMismatch {
flow_id: event.flow_id.to_string(),
event_kind: event.event_kind.to_string(),
});
severity = AdvancedSeverity::Critical;
dimensions.push(TroubleshootingDimension::ContractCompliance);
}
if kind == BaselineEventKind::IntegrationError && outcome == BaselineOutcomeClass::Success {
conflicts.push(AdvancedClassificationConflict::OutcomeEventMismatch {
event_kind: event.event_kind.to_string(),
outcome_class: event.outcome_class.to_string(),
});
severity = severity.max(AdvancedSeverity::Error);
dimensions.push(TroubleshootingDimension::ContractCompliance);
}
if outcome == BaselineOutcomeClass::Cancelled {
dimensions.push(TroubleshootingDimension::CancellationPath);
}
if outcome == BaselineOutcomeClass::Failed {
dimensions.push(TroubleshootingDimension::RecoveryPlanning);
}
dimensions.sort_unstable();
dimensions.dedup();
conflicts.sort();
let outcome_phrase = match outcome {
BaselineOutcomeClass::Success => "completed successfully",
BaselineOutcomeClass::Cancelled => "was cancelled",
BaselineOutcomeClass::Failed => "failed",
};
Ok(AdvancedLogClassification {
event_class,
severity,
dimensions,
narrative: format!(
"{}:{} {}. {}",
event.flow_id, event.event_kind, outcome_phrase, kind_narrative
),
recommended_action: if conflicts.is_empty() {
action_hint.to_string()
} else {
format!(
"{action_hint} Resolve taxonomy conflicts before trusting downstream automation."
)
},
conflicts,
})
}
pub fn classify_baseline_log_events(
events: &[BaselineLogEvent<'_>],
) -> Result<Vec<AdvancedLogClassification>, String> {
events
.iter()
.map(|event| classify_baseline_log_event(*event))
.collect()
}
fn flow_allows_event(flow: BaselineFlowId, kind: BaselineEventKind) -> bool {
match flow {
BaselineFlowId::Execution => matches!(
kind,
BaselineEventKind::CommandComplete
| BaselineEventKind::CommandStart
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Integration => matches!(
kind,
BaselineEventKind::IntegrationError
| BaselineEventKind::IntegrationSync
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Remediation => matches!(
kind,
BaselineEventKind::RemediationApply
| BaselineEventKind::RemediationVerify
| BaselineEventKind::VerificationSummary
),
BaselineFlowId::Replay => matches!(
kind,
BaselineEventKind::ReplayComplete
| BaselineEventKind::ReplayStart
| BaselineEventKind::VerificationSummary
),
}
}
fn kind_semantics(
kind: BaselineEventKind,
) -> (
AdvancedEventClass,
Vec<TroubleshootingDimension>,
&'static str,
&'static str,
) {
match kind {
BaselineEventKind::CommandComplete => (
AdvancedEventClass::CommandLifecycle,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::OperatorAction,
],
"Execution gate completed and emitted a deterministic artifact pointer",
"Review gate summary and continue pipeline progression.",
),
BaselineEventKind::CommandStart => (
AdvancedEventClass::CommandLifecycle,
vec![TroubleshootingDimension::OperatorAction],
"Execution gate started with reproducible command provenance",
"Monitor for completion and verify emitted command provenance.",
),
BaselineEventKind::IntegrationError => (
AdvancedEventClass::IntegrationReliability,
vec![
TroubleshootingDimension::ExternalDependency,
TroubleshootingDimension::OperatorAction,
],
"Integration boundary reported an error at an external/system edge",
"Inspect integration target, retry posture, and boundary adapter diagnostics.",
),
BaselineEventKind::IntegrationSync => (
AdvancedEventClass::IntegrationReliability,
vec![TroubleshootingDimension::ExternalDependency],
"Integration synchronization event captured adapter boundary state",
"Verify upstream/downstream contract alignment for this sync point.",
),
BaselineEventKind::RemediationApply => (
AdvancedEventClass::RemediationSafety,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::RecoveryPlanning,
],
"Remediation apply phase executed against diagnosed findings",
"Confirm changes are scoped and queue remediation verification.",
),
BaselineEventKind::RemediationVerify => (
AdvancedEventClass::RemediationSafety,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::RecoveryPlanning,
],
"Post-remediation verification assessed health deltas and invariants",
"Evaluate health delta and close or reopen remediation loops.",
),
BaselineEventKind::ReplayComplete => (
AdvancedEventClass::ReplayDeterminism,
vec![
TroubleshootingDimension::Determinism,
TroubleshootingDimension::RuntimeInvariant,
],
"Replay completion captured deterministic scenario convergence status",
"Compare replay artifacts against baseline and investigate divergence.",
),
BaselineEventKind::ReplayStart => (
AdvancedEventClass::ReplayDeterminism,
vec![TroubleshootingDimension::Determinism],
"Replay start established deterministic execution context",
"Track replay progress and preserve trace/evidence join keys.",
),
BaselineEventKind::VerificationSummary => (
AdvancedEventClass::VerificationGovernance,
vec![
TroubleshootingDimension::ContractCompliance,
TroubleshootingDimension::Determinism,
TroubleshootingDimension::RuntimeInvariant,
],
"Verification summary synthesized gate outcomes for governance review",
"Use summary to decide promotion, rollback, or targeted investigation.",
),
}
}
impl DirectionalDeadlockReport {
#[must_use]
pub fn to_ndjson(&self) -> String {
let mut lines = Vec::new();
lines.push(serde_json::json!({
"type": "deadlock_report",
"severity": format!("{:?}", self.severity),
"risk_score": self.risk_score
}));
for (i, cycle) in self.cycles.iter().enumerate() {
lines.push(serde_json::json!({
"type": "deadlock_cycle",
"cycle_index": i,
"cycle_length": cycle.tasks.len(),
"task_ids": cycle.tasks.iter().map(|id| format!("{:?}", id)).collect::<Vec<_>>()
}));
}
lines
.into_iter()
.map(|line| line.to_string())
.collect::<Vec<_>>()
.join("\n")
}
}
impl RegionOpenExplanation {
#[must_use]
pub fn to_ndjson(&self) -> String {
let mut lines = Vec::new();
lines.push(serde_json::json!({
"type": "region_explanation",
"region_id": format!("{:?}", self.region_id),
"region_state": format!("{:?}", self.region_state),
"reason_count": self.reasons.len(),
"recommendation_count": self.recommendations.len()
}));
for (i, reason) in self.reasons.iter().enumerate() {
lines.push(serde_json::json!({
"type": "region_reason",
"region_id": format!("{:?}", self.region_id),
"reason_index": i,
"reason": format!("{:?}", reason)
}));
}
for (i, rec) in self.recommendations.iter().enumerate() {
lines.push(serde_json::json!({
"type": "region_recommendation",
"region_id": format!("{:?}", self.region_id),
"recommendation_index": i,
"recommendation": rec
}));
}
lines
.into_iter()
.map(|line| line.to_string())
.collect::<Vec<_>>()
.join("\n")
}
}
impl TaskBlockedExplanation {
#[must_use]
pub fn to_ndjson(&self) -> String {
let mut lines = Vec::new();
lines.push(serde_json::json!({
"type": "task_explanation",
"task_id": format!("{:?}", self.task_id),
"block_reason": format!("{:?}", self.block_reason),
"detail_count": self.details.len(),
"recommendation_count": self.recommendations.len()
}));
for (i, detail) in self.details.iter().enumerate() {
lines.push(serde_json::json!({
"type": "task_detail",
"task_id": format!("{:?}", self.task_id),
"detail_index": i,
"detail": detail
}));
}
for (i, rec) in self.recommendations.iter().enumerate() {
lines.push(serde_json::json!({
"type": "task_recommendation",
"task_id": format!("{:?}", self.task_id),
"recommendation_index": i,
"recommendation": rec
}));
}
lines
.into_iter()
.map(|line| line.to_string())
.collect::<Vec<_>>()
.join("\n")
}
}
impl ObligationLeak {
#[must_use]
pub fn to_ndjson(&self) -> String {
serde_json::json!({
"type": "obligation_leak",
"obligation_id": format!("{:?}", self.obligation_id),
"obligation_type": self.obligation_type,
"holder_task": self.holder_task.map(|id| format!("{:?}", id)),
"region_id": format!("{:?}", self.region_id),
"age_ms": self.age.as_millis()
})
.to_string()
}
}
#[cfg(test)]
#[allow(clippy::arc_with_non_send_sync)]
mod tests {
use super::*;
use crate::observability::spectral_health::HealthClassification;
use crate::record::obligation::{ObligationKind, ObligationRecord};
use crate::record::region::RegionRecord;
use crate::record::task::{TaskRecord, TaskState};
use crate::time::{TimerDriverHandle, VirtualClock};
use crate::types::{Budget, CancelReason, Outcome};
use crate::util::ArenaIndex;
use serde_json::{Value, json};
use std::fmt::Write as _;
use std::sync::Arc;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn insert_child_region(state: &mut RuntimeState, parent: RegionId) -> RegionId {
let idx = state.regions.insert(RegionRecord::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
Some(parent),
Budget::INFINITE,
));
let id = RegionId::from_arena(idx);
let record = state.regions.get_mut(idx).expect("child region missing");
record.id = id;
let added = state
.regions
.get(parent.arena_index())
.expect("parent missing")
.add_child(id);
crate::assert_with_log!(added.is_ok(), "child added", true, added.is_ok());
id
}
fn insert_task(state: &mut RuntimeState, region: RegionId, task_state: TaskState) -> TaskId {
let idx = state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
region,
Budget::INFINITE,
));
let id = TaskId::from_arena(idx);
let record = state.task_mut(id).expect("task missing");
record.id = id;
record.state = task_state;
let added = state
.regions
.get(region.arena_index())
.expect("region missing")
.add_task(id);
crate::assert_with_log!(added.is_ok(), "task added", true, added.is_ok());
id
}
fn insert_obligation(
state: &mut RuntimeState,
region: RegionId,
holder: TaskId,
kind: ObligationKind,
reserved_at: Time,
) -> ObligationId {
let idx = state.obligations.insert(ObligationRecord::new(
ObligationId::from_arena(ArenaIndex::new(0, 0)),
kind,
holder,
region,
reserved_at,
));
let id = ObligationId::from_arena(idx);
let record = state.obligations.get_mut(idx).expect("obligation missing");
record.id = id;
id
}
fn render_structured_diagnostic_report(
scenario: &str,
generated_at: &str,
region: Option<&RegionOpenExplanation>,
task: Option<&TaskBlockedExplanation>,
leaks: &[ObligationLeak],
) -> String {
let mut rendered = String::new();
writeln!(&mut rendered, "scenario: {scenario}").expect("write scenario");
writeln!(&mut rendered, "generated_at: {generated_at}").expect("write timestamp");
rendered.push_str("\n[region]\n");
if let Some(region) = region {
rendered.push_str(®ion.to_string());
} else {
rendered.push_str("none\n");
}
rendered.push_str("\n[task]\n");
if let Some(task) = task {
rendered.push_str(&task.to_string());
} else {
rendered.push_str("none\n");
}
rendered.push_str("\n[leaks]\n");
if leaks.is_empty() {
rendered.push_str("none\n");
} else {
for leak in leaks {
writeln!(
&mut rendered,
"- {:?} region={:?} holder={:?} type={} age_ms={}",
leak.obligation_id,
leak.region_id,
leak.holder_task,
leak.obligation_type,
leak.age.as_millis()
)
.expect("write leak");
}
}
rendered.trim_end().to_string()
}
fn render_structured_diagnostic_report_v2(sections: &[(&str, &str)]) -> String {
let mut rendered = String::from("report_version: v2");
for (label, section) in sections {
writeln!(&mut rendered, "\n\n[{label}]").expect("write section label");
rendered.push_str(section);
}
rendered.trim_end().to_string()
}
struct DiagnosticResourceAccounting {
total_regions: usize,
open_regions: usize,
total_tasks: usize,
live_tasks: usize,
total_obligations: usize,
leaked_obligations: usize,
}
struct DiagnosticReportV3Section<'a> {
label: &'a str,
status: &'a str,
accounting: DiagnosticResourceAccounting,
rendered: &'a str,
}
#[derive(Debug, Clone)]
struct DiagnosticMetricHistogram {
name: String,
buckets: Vec<(String, u64)>, total_count: u64,
percentiles: Vec<(f64, f64)>, }
struct DiagnosticReportV4Section<'a> {
label: &'a str,
status: &'a str,
accounting: DiagnosticResourceAccounting,
histograms: Vec<DiagnosticMetricHistogram>,
rendered: &'a str,
}
fn diagnostic_resource_accounting(
diagnostics: &Diagnostics,
leaked_obligations: usize,
) -> DiagnosticResourceAccounting {
let total_regions = diagnostics.state.regions.iter().count();
let open_regions = diagnostics
.state
.regions
.iter()
.filter(|(_, region)| region.state() != RegionState::Closed)
.count();
let total_tasks = diagnostics.state.tasks_iter().count();
let live_tasks = diagnostics
.state
.tasks_iter()
.filter(|(_, task)| !task.state.is_terminal())
.count();
let total_obligations = diagnostics.state.obligations.iter().count();
DiagnosticResourceAccounting {
total_regions,
open_regions,
total_tasks,
live_tasks,
total_obligations,
leaked_obligations,
}
}
fn render_structured_diagnostic_report_v3(
sections: &[DiagnosticReportV3Section<'_>],
) -> String {
let mut rendered = String::from("report_version: v3");
let passing_count = sections
.iter()
.filter(|section| section.status == "passing")
.count();
let degraded_count = sections
.iter()
.filter(|section| section.status == "degraded")
.count();
let critical_count = sections
.iter()
.filter(|section| section.status == "critical")
.count();
writeln!(&mut rendered, "\n\n[summary]").expect("write summary label");
writeln!(&mut rendered, "scenario_count: {}", sections.len())
.expect("write scenario count");
writeln!(&mut rendered, "passing_count: {passing_count}").expect("write passing count");
writeln!(&mut rendered, "degraded_count: {degraded_count}").expect("write degraded count");
writeln!(&mut rendered, "critical_count: {critical_count}").expect("write critical count");
for section in sections {
writeln!(&mut rendered, "\n\n[{}]", section.label).expect("write section label");
writeln!(&mut rendered, "status: {}", section.status).expect("write status");
rendered.push_str("resource_accounting:\n");
writeln!(
&mut rendered,
" - regions_total: {}",
section.accounting.total_regions
)
.expect("write total regions");
writeln!(
&mut rendered,
" - regions_open: {}",
section.accounting.open_regions
)
.expect("write open regions");
writeln!(
&mut rendered,
" - tasks_total: {}",
section.accounting.total_tasks
)
.expect("write total tasks");
writeln!(
&mut rendered,
" - tasks_live: {}",
section.accounting.live_tasks
)
.expect("write live tasks");
writeln!(
&mut rendered,
" - obligations_total: {}",
section.accounting.total_obligations
)
.expect("write total obligations");
writeln!(
&mut rendered,
" - obligations_leaked: {}",
section.accounting.leaked_obligations
)
.expect("write leaked obligations");
rendered.push_str("report:\n");
for line in section.rendered.lines() {
writeln!(&mut rendered, " {line}").expect("write report line");
}
}
rendered.trim_end().to_string()
}
fn render_structured_diagnostic_report_v4(
sections: &[DiagnosticReportV4Section<'_>],
) -> String {
let mut rendered = String::from("report_version: v4");
let passing_count = sections
.iter()
.filter(|section| section.status == "passing")
.count();
let degraded_count = sections
.iter()
.filter(|section| section.status == "degraded")
.count();
let critical_count = sections
.iter()
.filter(|section| section.status == "critical")
.count();
writeln!(&mut rendered, "\n\n[summary]").expect("write summary label");
writeln!(&mut rendered, "scenario_count: {}", sections.len())
.expect("write scenario count");
writeln!(&mut rendered, "passing_count: {passing_count}").expect("write passing count");
writeln!(&mut rendered, "degraded_count: {degraded_count}").expect("write degraded count");
writeln!(&mut rendered, "critical_count: {critical_count}").expect("write critical count");
let total_histogram_count: u64 = sections
.iter()
.flat_map(|section| §ion.histograms)
.map(|h| h.total_count)
.sum();
writeln!(
&mut rendered,
"total_histogram_samples: {total_histogram_count}"
)
.expect("write total histogram samples");
let histogram_types: std::collections::BTreeSet<&String> = sections
.iter()
.flat_map(|section| §ion.histograms)
.map(|h| &h.name)
.collect();
writeln!(
&mut rendered,
"histogram_types: [{}]",
histogram_types
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(", ")
)
.expect("write histogram types");
for section in sections {
writeln!(&mut rendered, "\n\n[{}]", section.label).expect("write section label");
writeln!(&mut rendered, "status: {}", section.status).expect("write status");
rendered.push_str("resource_accounting:\n");
writeln!(
&mut rendered,
" - regions_total: {}",
section.accounting.total_regions
)
.expect("write total regions");
writeln!(
&mut rendered,
" - regions_open: {}",
section.accounting.open_regions
)
.expect("write open regions");
writeln!(
&mut rendered,
" - tasks_total: {}",
section.accounting.total_tasks
)
.expect("write total tasks");
writeln!(
&mut rendered,
" - tasks_live: {}",
section.accounting.live_tasks
)
.expect("write live tasks");
writeln!(
&mut rendered,
" - obligations_total: {}",
section.accounting.total_obligations
)
.expect("write total obligations");
writeln!(
&mut rendered,
" - obligations_leaked: {}",
section.accounting.leaked_obligations
)
.expect("write leaked obligations");
if !section.histograms.is_empty() {
rendered.push_str("extended_metrics:\n");
for histogram in §ion.histograms {
writeln!(&mut rendered, " - name: {}", histogram.name)
.expect("write histogram name");
writeln!(
&mut rendered,
" total_samples: {}",
histogram.total_count
)
.expect("write histogram total");
if !histogram.buckets.is_empty() {
rendered.push_str(" buckets:\n");
for (bucket_name, count) in &histogram.buckets {
writeln!(&mut rendered, " - {}: {}", bucket_name, count)
.expect("write bucket");
}
}
if !histogram.percentiles.is_empty() {
rendered.push_str(" percentiles:\n");
for (percentile, value) in &histogram.percentiles {
writeln!(
&mut rendered,
" - p{:.1}: {:.3}",
percentile * 100.0,
value
)
.expect("write percentile");
}
}
}
}
rendered.push_str("report:\n");
for line in section.rendered.lines() {
writeln!(&mut rendered, " {line}").expect("write report line");
}
}
rendered.trim_end().to_string()
}
fn scrub_diagnostic_report_timestamps(rendered: &str) -> String {
rendered
.lines()
.map(|line| {
let trimmed = line.trim_start();
let detail = trimmed.strip_prefix("- ").unwrap_or(trimmed);
if line.starts_with("generated_at: ") {
"generated_at: <scrubbed>".to_string()
} else if detail.starts_with("next_retry_at: ") {
" - next_retry_at: <scrubbed>".to_string()
} else if detail.starts_with("observed_at: ") {
" - observed_at: <scrubbed>".to_string()
} else if detail.starts_with("deadline_at: ") {
" - deadline_at: <scrubbed>".to_string()
} else {
line.to_string()
}
})
.collect::<Vec<_>>()
.join("\n")
}
fn assert_diagnostic_report_snapshot(snapshot_name: &str, rendered: &str) {
insta::with_settings!({
snapshot_path => "../../tests/snapshots",
prepend_module_to_snapshot => false,
}, {
insta::assert_snapshot!(snapshot_name, rendered);
});
}
fn insert_wait_path(state: &mut RuntimeState, region: RegionId, len: usize) -> Vec<TaskId> {
let tasks: Vec<TaskId> = (0..len)
.map(|_| insert_task(state, region, TaskState::Running))
.collect();
for pair in tasks.windows(2) {
state
.task_mut(pair[0])
.expect("path task missing")
.waiters
.push(pair[1]);
}
tasks
}
fn round_metric(value: f64) -> f64 {
(value * 1_000.0).round() / 1_000.0
}
fn deadlock_severity_label(severity: DeadlockSeverity) -> &'static str {
match severity {
DeadlockSeverity::None => "none",
DeadlockSeverity::Elevated => "elevated",
DeadlockSeverity::Critical => "critical",
}
}
fn health_classification_json(classification: &HealthClassification) -> Value {
match classification {
HealthClassification::Deadlocked => json!({
"kind": "deadlocked",
}),
HealthClassification::Healthy { margin } => json!({
"kind": "healthy",
"margin": round_metric(*margin),
}),
HealthClassification::Degraded {
fiedler,
bottleneck_nodes,
} => json!({
"kind": "degraded",
"fiedler": round_metric(*fiedler),
"bottleneck_nodes": bottleneck_nodes,
}),
HealthClassification::Critical {
fiedler,
approaching_disconnect,
} => json!({
"kind": "critical",
"fiedler": round_metric(*fiedler),
"approaching_disconnect": approaching_disconnect,
}),
HealthClassification::Fragmented { components } => json!({
"kind": "fragmented",
"components": components,
}),
}
}
fn overall_health_status(
health: &SpectralHealthReport,
deadlock: &DirectionalDeadlockReport,
leak_count: usize,
) -> &'static str {
if deadlock.severity == DeadlockSeverity::Critical
|| matches!(
health.classification,
HealthClassification::Deadlocked
| HealthClassification::Critical { .. }
| HealthClassification::Fragmented { .. }
)
{
"critical"
} else if leak_count > 0
|| deadlock.severity == DeadlockSeverity::Elevated
|| matches!(health.classification, HealthClassification::Degraded { .. })
{
"degraded"
} else {
"passing"
}
}
fn render_diagnostic_healthcheck_json(
diagnostics: &Diagnostics,
region_id: RegionId,
generated_at: &str,
pid: u32,
) -> Value {
let health = diagnostics.analyze_structural_health();
let deadlock = diagnostics.analyze_directional_deadlock();
let region = diagnostics.explain_region_open(region_id);
let leaks = diagnostics.find_leaked_obligations();
let leak_count = leaks.len();
let max_age_ms = leaks
.iter()
.map(|leak| u64::try_from(leak.age.as_millis()).unwrap_or(u64::MAX))
.max()
.unwrap_or(0);
json!({
"generated_at": generated_at,
"pid": pid,
"status": overall_health_status(&health, &deadlock, leak_count),
"structural_health": {
"classification": health_classification_json(&health.classification),
"fiedler_value": round_metric(health.decomposition.fiedler_value),
"spectral_gap": round_metric(health.decomposition.spectral_gap),
"spectral_radius": round_metric(health.decomposition.spectral_radius),
"iterations_used": health.decomposition.iterations_used,
"bottleneck_count": health.bottlenecks.len(),
},
"directional_deadlock": {
"severity": deadlock_severity_label(deadlock.severity),
"risk_score": round_metric(deadlock.risk_score),
"cycle_count": deadlock.cycles.len(),
"trapped_cycle_count": deadlock.cycles.iter().filter(|cycle| cycle.trapped).count(),
"cycles": deadlock.cycles.iter().map(|cycle| json!({
"tasks": cycle.tasks.iter().map(|task| format!("{task:?}")).collect::<Vec<_>>(),
"trapped": cycle.trapped,
"ingress_edges": cycle.ingress_edges,
"egress_edges": cycle.egress_edges,
})).collect::<Vec<_>>(),
},
"region": {
"id": format!("{:?}", region.region_id),
"state": region.region_state.map(|state| format!("{state:?}")),
"reason_count": region.reasons.len(),
"recommendation_count": region.recommendations.len(),
},
"obligations": {
"leak_count": leak_count,
"max_age_ms": max_age_ms,
"leaks": leaks.iter().map(|leak| json!({
"obligation_id": format!("{:?}", leak.obligation_id),
"region_id": format!("{:?}", leak.region_id),
"holder_task": leak.holder_task.map(|task| format!("{task:?}")),
"obligation_type": &leak.obligation_type,
"age_ms": u64::try_from(leak.age.as_millis()).unwrap_or(u64::MAX),
})).collect::<Vec<_>>(),
},
})
}
fn scrub_diagnostic_healthcheck_json(mut value: Value) -> Value {
let Some(object) = value.as_object_mut() else {
return value;
};
object.insert(
"generated_at".to_string(),
Value::String("<scrubbed>".to_string()),
);
object.insert("pid".to_string(), Value::String("<scrubbed>".to_string()));
value
}
fn assert_diagnostic_healthcheck_snapshot(snapshot_name: &str, value: &Value) {
insta::with_settings!({
snapshot_path => "../../tests/snapshots",
prepend_module_to_snapshot => false,
}, {
insta::assert_json_snapshot!(snapshot_name, value);
});
}
#[test]
fn test_explain_region_open_unknown_region_returns_reason() {
init_test("test_explain_region_open_unknown_region_returns_reason");
let state = Arc::new(RuntimeState::new());
let diagnostics = Diagnostics::new(state);
let missing = RegionId::new_for_test(99, 0);
let explanation = diagnostics.explain_region_open(missing);
crate::assert_with_log!(
explanation.region_state.is_none(),
"region_state none",
true,
explanation.region_state.is_none()
);
crate::assert_with_log!(
explanation.reasons.len() == 1,
"single reason",
1usize,
explanation.reasons.len()
);
let is_not_found = matches!(explanation.reasons.first(), Some(Reason::RegionNotFound));
crate::assert_with_log!(is_not_found, "region not found reason", true, is_not_found);
let has_recommendation = explanation
.recommendations
.iter()
.any(|rec| rec.contains("Verify region id"));
crate::assert_with_log!(
has_recommendation,
"recommendation present",
true,
has_recommendation
);
crate::test_complete!("test_explain_region_open_unknown_region_returns_reason");
}
#[test]
fn test_explain_region_open_closed_region_has_no_reasons() {
init_test("test_explain_region_open_closed_region_has_no_reasons");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let region = state.region(root).expect("root missing");
let did_close =
region.begin_close(None) && region.begin_finalize() && region.complete_close();
crate::assert_with_log!(did_close, "region closed", true, did_close);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation.region_state == Some(RegionState::Closed),
"closed state",
true,
explanation.region_state == Some(RegionState::Closed)
);
crate::assert_with_log!(
explanation.reasons.is_empty(),
"no reasons",
true,
explanation.reasons.is_empty()
);
crate::assert_with_log!(
explanation.recommendations.is_empty(),
"no recommendations",
true,
explanation.recommendations.is_empty()
);
crate::test_complete!("test_explain_region_open_closed_region_has_no_reasons");
}
#[test]
fn test_explain_region_open_reports_children_tasks_obligations() {
init_test("test_explain_region_open_reports_children_tasks_obligations");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let task_id = insert_task(&mut state, root, TaskState::Running);
let task = state.task_mut(task_id).expect("task missing");
task.total_polls = 7;
let obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::SendPermit,
Time::from_millis(10),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
let mut saw_child = false;
let mut saw_task = false;
let mut saw_obligation = false;
for reason in &explanation.reasons {
match reason {
Reason::ChildRegionOpen { child_id, .. } if *child_id == child => {
saw_child = true;
}
Reason::TaskRunning {
task_id: id,
poll_count,
..
} if *id == task_id && *poll_count == 7 => {
saw_task = true;
}
Reason::ObligationHeld {
obligation_id: id,
holder_task,
..
} if *id == obligation_id && *holder_task == task_id => {
saw_obligation = true;
}
_ => {}
}
}
crate::assert_with_log!(saw_child, "child reason", true, saw_child);
crate::assert_with_log!(saw_task, "task reason", true, saw_task);
crate::assert_with_log!(saw_obligation, "obligation reason", true, saw_obligation);
let recs = &explanation.recommendations;
let has_child_rec = recs.iter().any(|r| r.contains("child regions"));
let has_task_rec = recs.iter().any(|r| r.contains("live tasks"));
let has_obligation_rec = recs.iter().any(|r| r.contains("obligations"));
crate::assert_with_log!(has_child_rec, "child rec", true, has_child_rec);
crate::assert_with_log!(has_task_rec, "task rec", true, has_task_rec);
crate::assert_with_log!(
has_obligation_rec,
"obligation rec",
true,
has_obligation_rec
);
let rendered = explanation.to_string();
crate::assert_with_log!(
rendered.contains("child region"),
"display includes child",
true,
rendered.contains("child region")
);
crate::assert_with_log!(
rendered.contains("obligation"),
"display includes obligation",
true,
rendered.contains("obligation")
);
crate::test_complete!("test_explain_region_open_reports_children_tasks_obligations");
}
#[test]
fn test_explain_region_open_nested_child_reports_immediate_child() {
init_test("test_explain_region_open_nested_child_reports_immediate_child");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let grandchild = insert_child_region(&mut state, child);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(child);
let saw_grandchild = explanation.reasons.iter().any(|reason| {
matches!(
reason,
Reason::ChildRegionOpen { child_id, .. } if *child_id == grandchild
)
});
crate::assert_with_log!(saw_grandchild, "grandchild reason", true, saw_grandchild);
crate::test_complete!("test_explain_region_open_nested_child_reports_immediate_child");
}
#[test]
fn test_explain_task_blocked_running_notified_reports_schedule() {
init_test("test_explain_task_blocked_running_notified_reports_schedule");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let task = state.task_mut(task_id).expect("task missing");
let notified = task.wake_state.notify();
crate::assert_with_log!(notified, "wake notified", true, notified);
task.waiters.push(TaskId::new_for_test(77, 0));
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
matches!(explanation.block_reason, BlockReason::AwaitingSchedule),
"awaiting schedule",
true,
matches!(explanation.block_reason, BlockReason::AwaitingSchedule)
);
let has_waiters = explanation.details.iter().any(|d| d.contains("waiters"));
crate::assert_with_log!(has_waiters, "waiters detail", true, has_waiters);
crate::test_complete!("test_explain_task_blocked_running_notified_reports_schedule");
}
#[test]
fn test_explain_task_blocked_cancel_requested_includes_reason() {
init_test("test_explain_task_blocked_cancel_requested_includes_reason");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let reason = CancelReason::user("stop");
let cleanup_budget = reason.cleanup_budget();
let task_id = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason,
cleanup_budget,
},
);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
let matches_reason = matches!(
explanation.block_reason,
BlockReason::CancelRequested {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: Some(_)
}
}
);
crate::assert_with_log!(matches_reason, "cancel requested", true, matches_reason);
let rendered = explanation.to_string();
crate::assert_with_log!(
rendered.contains("cancel requested"),
"display includes cancel",
true,
rendered.contains("cancel requested")
);
crate::test_complete!("test_explain_task_blocked_cancel_requested_includes_reason");
}
#[test]
fn test_explain_task_blocked_completed_reports_completed() {
init_test("test_explain_task_blocked_completed_reports_completed");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
matches!(explanation.block_reason, BlockReason::Completed),
"completed",
true,
matches!(explanation.block_reason, BlockReason::Completed)
);
crate::test_complete!("test_explain_task_blocked_completed_reports_completed");
}
#[test]
fn test_find_leaked_obligations_sorted_and_aged() {
init_test("test_find_leaked_obligations_sorted_and_aged");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(100)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let root_task = insert_task(&mut state, root, TaskState::Running);
let child_task = insert_task(&mut state, child, TaskState::Running);
let root_ob = insert_obligation(
&mut state,
root,
root_task,
ObligationKind::Ack,
Time::from_millis(10),
);
let child_ob = insert_obligation(
&mut state,
child,
child_task,
ObligationKind::Lease,
Time::from_millis(20),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 2, "two leaks", 2usize, leaks.len());
crate::assert_with_log!(
leaks[0].region_id == root,
"root first",
true,
leaks[0].region_id == root
);
crate::assert_with_log!(
leaks[1].region_id == child,
"child second",
true,
leaks[1].region_id == child
);
crate::assert_with_log!(
leaks[0].obligation_id == root_ob,
"root obligation id",
true,
leaks[0].obligation_id == root_ob
);
crate::assert_with_log!(
leaks[1].obligation_id == child_ob,
"child obligation id",
true,
leaks[1].obligation_id == child_ob
);
let root_age_ms = leaks[0].age.as_millis();
let child_age_ms = leaks[1].age.as_millis();
crate::assert_with_log!(root_age_ms == 90, "root age", 90u128, root_age_ms);
crate::assert_with_log!(child_age_ms == 80, "child age", 80u128, child_age_ms);
crate::test_complete!("test_find_leaked_obligations_sorted_and_aged");
}
#[test]
fn test_find_leaked_obligations_uses_state_clock_without_timer_driver() {
init_test("test_find_leaked_obligations_uses_state_clock_without_timer_driver");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
state.now = Time::from_millis(250);
let obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Lease,
Time::from_millis(10),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 1, "single leak", 1usize, leaks.len());
crate::assert_with_log!(
leaks[0].obligation_id == obligation_id,
"obligation id preserved",
true,
leaks[0].obligation_id == obligation_id
);
let age_ms = leaks[0].age.as_millis();
crate::assert_with_log!(age_ms == 240, "age uses state clock", 240u128, age_ms);
crate::test_complete!("test_find_leaked_obligations_uses_state_clock_without_timer_driver");
}
#[test]
fn reason_debug_clone() {
let r = Reason::RegionNotFound;
let r2 = r;
assert!(format!("{r2:?}").contains("RegionNotFound"));
}
#[test]
fn reason_display_all_variants() {
let r1 = Reason::RegionNotFound;
assert!(r1.to_string().contains("not found"));
let r2 = Reason::ChildRegionOpen {
child_id: RegionId::new_for_test(1, 0),
child_state: RegionState::Open,
};
assert!(r2.to_string().contains("child region"));
let r3 = Reason::TaskRunning {
task_id: TaskId::new_for_test(1, 0),
task_state: "Running".into(),
poll_count: 5,
};
assert!(r3.to_string().contains("task"));
assert!(r3.to_string().contains("polls=5"));
let r4 = Reason::ObligationHeld {
obligation_id: ObligationId::new_for_test(1, 0),
obligation_type: "Lease".into(),
holder_task: TaskId::new_for_test(2, 0),
};
assert!(r4.to_string().contains("obligation"));
assert!(r4.to_string().contains("Lease"));
}
#[test]
fn region_open_explanation_debug_clone() {
let explanation = RegionOpenExplanation {
region_id: RegionId::new_for_test(1, 0),
region_state: Some(RegionState::Open),
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["check it".into()],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("RegionOpenExplanation"));
}
#[test]
fn region_open_explanation_display() {
let explanation = RegionOpenExplanation {
region_id: RegionId::new_for_test(1, 0),
region_state: Some(RegionState::Open),
reasons: vec![Reason::RegionNotFound],
recommendations: vec!["fix it".into()],
};
let s = explanation.to_string();
assert!(s.contains("still open"));
assert!(s.contains("fix it"));
}
#[test]
fn task_blocked_explanation_debug_clone() {
let explanation = TaskBlockedExplanation {
task_id: TaskId::new_for_test(1, 0),
block_reason: BlockReason::NotStarted,
details: vec!["detail".into()],
recommendations: vec!["wait".into()],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("TaskBlockedExplanation"));
}
#[test]
fn task_blocked_explanation_display() {
let explanation = TaskBlockedExplanation {
task_id: TaskId::new_for_test(1, 0),
block_reason: BlockReason::AwaitingSchedule,
details: vec!["pending wake".into()],
recommendations: vec!["wait for scheduler".into()],
};
let s = explanation.to_string();
assert!(s.contains("blocked"));
assert!(s.contains("awaiting schedule"));
}
#[test]
fn block_reason_debug_clone() {
let r = BlockReason::TaskNotFound;
let r2 = r;
assert!(format!("{r2:?}").contains("TaskNotFound"));
}
#[test]
fn block_reason_display_all_variants() {
let variants: Vec<BlockReason> = vec![
BlockReason::TaskNotFound,
BlockReason::NotStarted,
BlockReason::AwaitingSchedule,
BlockReason::AwaitingFuture {
description: "channel recv".into(),
},
BlockReason::CancelRequested {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: Some("stop".into()),
},
},
BlockReason::RunningCleanup {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: None,
},
polls_remaining: 10,
},
BlockReason::Finalizing {
reason: CancelReasonInfo {
kind: CancelKind::User,
message: None,
},
polls_remaining: 5,
},
BlockReason::Completed,
];
for v in &variants {
assert!(!v.to_string().is_empty());
}
}
#[test]
fn cancellation_explanation_debug_clone() {
let explanation = CancellationExplanation {
kind: CancelKind::User,
message: Some("timeout".into()),
propagation_path: vec![CancellationStep {
region_id: RegionId::new_for_test(1, 0),
kind: CancelKind::User,
}],
};
let explanation2 = explanation;
assert!(format!("{explanation2:?}").contains("CancellationExplanation"));
}
#[test]
fn cancellation_step_debug_clone() {
let step = CancellationStep {
region_id: RegionId::new_for_test(1, 0),
kind: CancelKind::User,
};
let step2 = step;
assert!(format!("{step2:?}").contains("CancellationStep"));
}
#[test]
fn cancel_reason_info_debug_clone_display() {
let info = CancelReasonInfo {
kind: CancelKind::User,
message: Some("stop".into()),
};
let info2 = info.clone();
assert!(format!("{info2:?}").contains("CancelReasonInfo"));
let s = info.to_string();
assert!(s.contains("stop"));
let info_no_msg = CancelReasonInfo {
kind: CancelKind::User,
message: None,
};
assert!(!info_no_msg.to_string().is_empty());
}
#[test]
fn obligation_leak_debug_clone() {
let leak = ObligationLeak {
obligation_id: ObligationId::new_for_test(1, 0),
obligation_type: "Ack".into(),
holder_task: Some(TaskId::new_for_test(2, 0)),
region_id: RegionId::new_for_test(1, 0),
age: std::time::Duration::from_secs(60),
};
let leak2 = leak;
assert!(format!("{leak2:?}").contains("ObligationLeak"));
}
#[test]
fn directional_deadlock_cycle_detection_reports_critical() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2); state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let report = diagnostics.analyze_directional_deadlock();
assert_eq!(report.severity, DeadlockSeverity::Critical);
assert!(!report.cycles.is_empty());
assert!(report.cycles[0].trapped);
assert!(report.cycles[0].tasks.contains(&t1));
assert!(report.cycles[0].tasks.contains(&t2));
}
#[test]
fn structural_health_reports_deadlocked_for_trapped_cycle() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2);
state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let report = diagnostics.analyze_structural_health();
assert!(matches!(
report.classification,
crate::observability::spectral_health::HealthClassification::Deadlocked
));
}
#[test]
fn explain_region_open_includes_directional_deadlock_recommendation() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let t1 = insert_task(&mut state, root, TaskState::Running);
let t2 = insert_task(&mut state, root, TaskState::Running);
state.task_mut(t1).expect("t1").waiters.push(t2);
state.task_mut(t2).expect("t2").waiters.push(t1);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation = diagnostics.explain_region_open(root);
assert!(
explanation
.recommendations
.iter()
.any(|r| r.contains("Directional deadlock risk")),
"expected directional deadlock recommendation"
);
}
#[test]
fn advanced_observability_contract_has_sorted_dimensions_and_classes() {
let contract = advanced_observability_contract();
let classes: Vec<&str> = contract
.event_classes
.iter()
.map(|item| item.class_id.as_str())
.collect();
let mut sorted_classes = classes.clone();
sorted_classes.sort_unstable();
sorted_classes.dedup();
assert_eq!(classes, sorted_classes);
let dimensions: Vec<&str> = contract
.troubleshooting_dimensions
.iter()
.map(|item| item.dimension.as_str())
.collect();
let mut sorted_dimensions = dimensions.clone();
sorted_dimensions.sort_unstable();
sorted_dimensions.dedup();
assert_eq!(dimensions, sorted_dimensions);
}
#[test]
fn classify_baseline_log_event_maps_known_event() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "execution",
event_kind: "command_start",
outcome_class: "success",
})
.expect("classification should succeed");
assert_eq!(classified.event_class, AdvancedEventClass::CommandLifecycle);
assert_eq!(classified.severity, AdvancedSeverity::Info);
assert!(classified.conflicts.is_empty());
assert!(
classified
.dimensions
.contains(&TroubleshootingDimension::OperatorAction)
);
}
#[test]
fn classify_baseline_log_event_detects_flow_event_conflict() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "execution",
event_kind: "integration_sync",
outcome_class: "success",
})
.expect("classification should succeed with conflict");
assert_eq!(classified.severity, AdvancedSeverity::Critical);
assert!(classified.conflicts.iter().any(|conflict| matches!(
conflict,
AdvancedClassificationConflict::FlowEventMismatch { .. }
)));
}
#[test]
fn classify_baseline_log_event_detects_outcome_event_conflict() {
let classified = classify_baseline_log_event(BaselineLogEvent {
flow_id: "integration",
event_kind: "integration_error",
outcome_class: "success",
})
.expect("classification should succeed with conflict");
assert_eq!(
classified.event_class,
AdvancedEventClass::IntegrationReliability
);
assert_eq!(classified.severity, AdvancedSeverity::Error);
assert!(classified.conflicts.iter().any(|conflict| matches!(
conflict,
AdvancedClassificationConflict::OutcomeEventMismatch { .. }
)));
}
#[test]
fn classify_baseline_log_events_is_deterministic() {
let stream = vec![
BaselineLogEvent {
flow_id: "execution",
event_kind: "command_start",
outcome_class: "success",
},
BaselineLogEvent {
flow_id: "execution",
event_kind: "verification_summary",
outcome_class: "failed",
},
BaselineLogEvent {
flow_id: "replay",
event_kind: "replay_complete",
outcome_class: "cancelled",
},
];
let a = classify_baseline_log_events(&stream).expect("stream classification should pass");
let b = classify_baseline_log_events(&stream).expect("stream classification should pass");
assert_eq!(a, b);
assert!(!a.is_empty());
assert!(a.iter().all(|entry| !entry.narrative.is_empty()));
}
#[test]
fn classify_baseline_log_event_rejects_unknown_tokens() {
let err = classify_baseline_log_event(BaselineLogEvent {
flow_id: "unknown",
event_kind: "command_start",
outcome_class: "success",
})
.expect_err("unknown flow must be rejected");
assert!(err.contains("unknown flow_id"));
}
#[test]
fn structured_diagnostic_report_snapshot_happy_path() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let diagnostics = Diagnostics::new(Arc::new(state));
let rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-20T22:00:00Z",
None,
Some(&diagnostics.explain_task_blocked(task_id)),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_happy_path", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_v2_happy_and_degraded() {
let mut happy_state = RuntimeState::new();
let happy_root = happy_state.create_root_region(Budget::INFINITE);
let happy_task = insert_task(
&mut happy_state,
happy_root,
TaskState::Completed(Outcome::Ok(())),
);
let happy_diagnostics = Diagnostics::new(Arc::new(happy_state));
let happy_rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-20T22:00:00Z",
None,
Some(&happy_diagnostics.explain_task_blocked(happy_task)),
&[],
);
let happy_scrubbed = scrub_diagnostic_report_timestamps(&happy_rendered);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_child = insert_child_region(&mut degraded_state, degraded_root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
degraded_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let degraded_root_task =
insert_task(&mut degraded_state, degraded_root, TaskState::Running);
degraded_state
.task_mut(degraded_root_task)
.expect("root task missing")
.total_polls = 9;
let degraded_child_task = insert_task(
&mut degraded_state,
degraded_child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _degraded_root_obligation = insert_obligation(
&mut degraded_state,
degraded_root,
degraded_root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _degraded_child_obligation = insert_obligation(
&mut degraded_state,
degraded_child,
degraded_child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
let mut degraded_task = degraded_diagnostics.explain_task_blocked(degraded_child_task);
degraded_task
.details
.insert(0, "observed_at: 2026-04-20T22:00:05Z".to_string());
degraded_task
.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let degraded_leaks = degraded_diagnostics.find_leaked_obligations();
let degraded_rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-20T22:00:05Z",
Some(°raded_diagnostics.explain_region_open(degraded_root)),
Some(°raded_task),
°raded_leaks,
);
let degraded_scrubbed = scrub_diagnostic_report_timestamps(°raded_rendered);
let rendered = render_structured_diagnostic_report_v2(&[
("happy", &happy_scrubbed),
("degraded", °raded_scrubbed),
]);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v2",
&rendered,
);
}
#[test]
fn structured_diagnostic_report_snapshot_v3_happy_degraded_and_critical() {
let mut happy_state = RuntimeState::new();
let happy_root = happy_state.create_root_region(Budget::INFINITE);
let happy_task = insert_task(
&mut happy_state,
happy_root,
TaskState::Completed(Outcome::Ok(())),
);
let happy_diagnostics = Diagnostics::new(Arc::new(happy_state));
let happy_rendered = render_structured_diagnostic_report(
"happy_path",
"2026-04-21T09:10:00Z",
None,
Some(&happy_diagnostics.explain_task_blocked(happy_task)),
&[],
);
let happy_scrubbed = scrub_diagnostic_report_timestamps(&happy_rendered);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_child = insert_child_region(&mut degraded_state, degraded_root);
let degraded_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
degraded_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(
°raded_clock,
)));
let degraded_root_task =
insert_task(&mut degraded_state, degraded_root, TaskState::Running);
degraded_state
.task_mut(degraded_root_task)
.expect("degraded root task missing")
.total_polls = 9;
let degraded_child_task = insert_task(
&mut degraded_state,
degraded_child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _degraded_root_obligation = insert_obligation(
&mut degraded_state,
degraded_root,
degraded_root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _degraded_child_obligation = insert_obligation(
&mut degraded_state,
degraded_child,
degraded_child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
let mut degraded_task = degraded_diagnostics.explain_task_blocked(degraded_child_task);
degraded_task
.details
.insert(0, "observed_at: 2026-04-21T09:10:05Z".to_string());
degraded_task
.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let degraded_leaks = degraded_diagnostics.find_leaked_obligations();
let degraded_rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-21T09:10:05Z",
Some(°raded_diagnostics.explain_region_open(degraded_root)),
Some(°raded_task),
°raded_leaks,
);
let degraded_scrubbed = scrub_diagnostic_report_timestamps(°raded_rendered);
let mut critical_state = RuntimeState::new();
let critical_root = critical_state.create_root_region(Budget::INFINITE);
let critical_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
critical_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(
&critical_clock,
)));
let critical_t1 = insert_task(&mut critical_state, critical_root, TaskState::Running);
let critical_t2 = insert_task(&mut critical_state, critical_root, TaskState::Running);
critical_state
.task_mut(critical_t1)
.expect("critical task 1 missing")
.waiters
.push(critical_t2);
critical_state
.task_mut(critical_t2)
.expect("critical task 2 missing")
.waiters
.push(critical_t1);
let _critical_obligation = insert_obligation(
&mut critical_state,
critical_root,
critical_t1,
ObligationKind::Ack,
Time::from_millis(250),
);
let critical_diagnostics = Diagnostics::new(Arc::new(critical_state));
let mut critical_task = critical_diagnostics.explain_task_blocked(critical_t1);
critical_task
.details
.insert(0, "observed_at: 2026-04-21T09:10:09Z".to_string());
critical_task
.recommendations
.push("Break the trapped wait cycle before allowing retries.".to_string());
let critical_leaks = critical_diagnostics.find_leaked_obligations();
let critical_rendered = render_structured_diagnostic_report(
"critical_deadlock",
"2026-04-21T09:10:09Z",
Some(&critical_diagnostics.explain_region_open(critical_root)),
Some(&critical_task),
&critical_leaks,
);
let critical_scrubbed = scrub_diagnostic_report_timestamps(&critical_rendered);
let rendered = render_structured_diagnostic_report_v3(&[
DiagnosticReportV3Section {
label: "happy",
status: "passing",
accounting: diagnostic_resource_accounting(&happy_diagnostics, 0),
rendered: &happy_scrubbed,
},
DiagnosticReportV3Section {
label: "degraded",
status: "degraded",
accounting: diagnostic_resource_accounting(
°raded_diagnostics,
degraded_leaks.len(),
),
rendered: °raded_scrubbed,
},
DiagnosticReportV3Section {
label: "critical",
status: "critical",
accounting: diagnostic_resource_accounting(
&critical_diagnostics,
critical_leaks.len(),
),
rendered: &critical_scrubbed,
},
]);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v3",
&rendered,
);
}
#[test]
fn structured_diagnostic_report_snapshot_rate_limited() {
let task = TaskBlockedExplanation {
task_id: TaskId::new_for_test(11, 0),
block_reason: BlockReason::AwaitingFuture {
description: "rate-limited by token bucket".to_string(),
},
details: vec![
"next_retry_at: 2026-04-20T22:00:02Z".to_string(),
"queue_depth: 3".to_string(),
"limiter: outbound_http".to_string(),
],
recommendations: vec!["Wait for the limiter budget to replenish.".to_string()],
};
let rendered = render_structured_diagnostic_report(
"rate_limited",
"2026-04-20T22:00:01Z",
None,
Some(&task),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_rate_limited", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_oom() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::resource_unavailable().with_message("oom"),
cleanup_budget: Budget::new().with_poll_quota(64),
},
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(task_id);
task.details
.insert(0, "observed_at: 2026-04-20T22:00:03Z".to_string());
task.details.push("headroom: 0.00".to_string());
task.details.push("allocator_lane: region_heap".to_string());
task.recommendations
.push("Relieve memory pressure before retrying.".to_string());
let rendered = render_structured_diagnostic_report(
"oom",
"2026-04-20T22:00:03Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&[],
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_oom", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_deadline_exceeded() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let task_id = insert_task(
&mut state,
root,
TaskState::Finalizing {
reason: CancelReason::deadline().with_message("deadline exceeded"),
cleanup_budget: Budget::new().with_poll_quota(42),
},
);
let _obligation_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Lease,
Time::from_millis(250),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(task_id);
task.details
.insert(0, "deadline_at: 2026-04-20T22:00:04Z".to_string());
task.recommendations
.push("Inspect cleanup latency and tighten downstream budgets.".to_string());
let leaks = diagnostics.find_leaked_obligations();
let rendered = render_structured_diagnostic_report(
"deadline_exceeded",
"2026-04-20T22:00:04Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&leaks,
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_deadline_exceeded", &scrubbed);
}
#[test]
fn structured_diagnostic_report_snapshot_shutdown_drain() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child = insert_child_region(&mut state, root);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5_000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let root_task = insert_task(&mut state, root, TaskState::Running);
state
.task_mut(root_task)
.expect("root task missing")
.total_polls = 9;
let child_task = insert_task(
&mut state,
child,
TaskState::CancelRequested {
reason: CancelReason::shutdown().with_message("node draining"),
cleanup_budget: Budget::new().with_poll_quota(16),
},
);
let _root_obligation = insert_obligation(
&mut state,
root,
root_task,
ObligationKind::Ack,
Time::from_millis(500),
);
let _child_obligation = insert_obligation(
&mut state,
child,
child_task,
ObligationKind::Lease,
Time::from_millis(750),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let mut task = diagnostics.explain_task_blocked(child_task);
task.details
.insert(0, "observed_at: 2026-04-20T22:00:05Z".to_string());
task.recommendations
.push("Continue draining child region tasks before sealing shutdown.".to_string());
let leaks = diagnostics.find_leaked_obligations();
let rendered = render_structured_diagnostic_report(
"shutdown_drain",
"2026-04-20T22:00:05Z",
Some(&diagnostics.explain_region_open(root)),
Some(&task),
&leaks,
);
let scrubbed = scrub_diagnostic_report_timestamps(&rendered);
assert_diagnostic_report_snapshot("observability_diagnostics_shutdown_drain", &scrubbed);
}
#[test]
fn diagnostics_healthcheck_json_snapshot_scrubbed() {
init_test("diagnostics_healthcheck_json_snapshot_scrubbed");
let mut passing_state = RuntimeState::new();
let passing_root = passing_state.create_root_region(Budget::INFINITE);
let passing_region = passing_state
.region(passing_root)
.expect("passing root missing");
let did_close = passing_region.begin_close(None)
&& passing_region.begin_finalize()
&& passing_region.complete_close();
assert!(did_close, "passing root should close cleanly");
let passing = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
&Diagnostics::new(Arc::new(passing_state)),
passing_root,
"2026-04-21T08:30:00Z",
4101,
));
assert_eq!(
passing.get("status").and_then(Value::as_str),
Some("passing")
);
let mut degraded_state = RuntimeState::new();
let degraded_root = degraded_state.create_root_region(Budget::INFINITE);
let degraded_tasks = insert_wait_path(&mut degraded_state, degraded_root, 11);
degraded_state
.task_mut(*degraded_tasks.first().expect("degraded path head"))
.expect("degraded head task missing")
.total_polls = 3;
let degraded_diagnostics = Diagnostics::new(Arc::new(degraded_state));
assert!(
matches!(
degraded_diagnostics
.analyze_structural_health()
.classification,
HealthClassification::Degraded { .. }
),
"expected degraded classification for chained wait path"
);
let degraded = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
°raded_diagnostics,
degraded_root,
"2026-04-21T08:30:01Z",
4102,
));
assert_eq!(
degraded.get("status").and_then(Value::as_str),
Some("degraded")
);
let mut critical_state = RuntimeState::new();
let critical_root = critical_state.create_root_region(Budget::INFINITE);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1_500)));
critical_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let t1 = insert_task(&mut critical_state, critical_root, TaskState::Running);
let t2 = insert_task(&mut critical_state, critical_root, TaskState::Running);
critical_state
.task_mut(t1)
.expect("critical t1")
.waiters
.push(t2);
critical_state
.task_mut(t2)
.expect("critical t2")
.waiters
.push(t1);
let _critical_obligation = insert_obligation(
&mut critical_state,
critical_root,
t1,
ObligationKind::Ack,
Time::from_millis(250),
);
let critical_diagnostics = Diagnostics::new(Arc::new(critical_state));
assert_eq!(
critical_diagnostics.analyze_directional_deadlock().severity,
DeadlockSeverity::Critical
);
let critical = scrub_diagnostic_healthcheck_json(render_diagnostic_healthcheck_json(
&critical_diagnostics,
critical_root,
"2026-04-21T08:30:02Z",
4103,
));
assert_eq!(
critical.get("status").and_then(Value::as_str),
Some("critical")
);
assert_diagnostic_healthcheck_snapshot(
"observability_diagnostics_healthcheck_json",
&json!({
"passing": passing,
"degraded": degraded,
"critical": critical,
}),
);
}
#[test]
fn tail_latency_taxonomy_contract_has_unique_required_keys() {
let contract = tail_latency_taxonomy_contract();
let keys: Vec<&str> = contract
.required_log_fields
.iter()
.map(|field| field.key.as_str())
.collect();
let mut unique_keys = keys.clone();
unique_keys.sort_unstable();
unique_keys.dedup();
assert_eq!(keys.len(), unique_keys.len());
}
#[test]
fn tail_latency_taxonomy_contract_includes_unknown_bucket_and_signals() {
let contract = tail_latency_taxonomy_contract();
assert_eq!(
contract.contract_version,
TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION
);
assert_eq!(contract.unknown_bucket_key, "tail.unknown.unmeasured_ns");
assert!(
contract
.required_log_fields
.iter()
.any(|field| field.key == contract.unknown_bucket_key && field.required)
);
assert!(contract.terms.iter().any(|term| {
term.term_id == "unknown"
&& term.direct_duration_key == "tail.unknown.unmeasured_ns"
&& term
.signals
.iter()
.any(|signal| signal.structured_log_key == "tail.unknown.unmeasured_ns")
}));
}
#[test]
fn tail_latency_taxonomy_contract_core_signals_have_existing_files() {
let contract = tail_latency_taxonomy_contract();
let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
for signal in contract
.terms
.iter()
.flat_map(|term| term.signals.iter())
.filter(|signal| signal.core)
{
assert!(
repo_root.join(&signal.producer_file).exists(),
"producer file must exist: {}",
signal.producer_file
);
}
}
fn complete_tail_latency_sample() -> TailLatencyCompactSample {
TailLatencyCompactSample::new(12_000)
.with_ready_queue_depth(17)
.with_poll_count(5)
.with_events_received(9)
.with_retries_total_delay_ns(1_500)
.with_synchronization_lock_wait_ns(2_500)
.with_allocator_live_allocations(33)
.with_allocator_bytes_live(8_192)
}
#[test]
fn compact_tail_emitter_required_schema_keys_stay_stable() {
let event = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"scheduler-overload-smoke",
"event-0001",
complete_tail_latency_sample(),
)
.expect("emit should succeed")
.expect("enabled emitter should produce an event");
let contract = tail_latency_taxonomy_contract();
let required_keys: std::collections::BTreeSet<&str> = contract
.required_log_fields
.iter()
.map(|field| field.key.as_str())
.collect();
let actual_keys: std::collections::BTreeSet<&str> =
event.fields.keys().map(String::as_str).collect();
assert_eq!(actual_keys, required_keys);
assert_eq!(
event.schema_version,
TAIL_LATENCY_COMPACT_EVENT_SCHEMA_VERSION
);
assert_eq!(
event.taxonomy_version,
TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION
);
assert_eq!(event.unknown_unmeasured_ns, 8_000);
}
#[test]
fn compact_tail_emitter_missing_producers_fall_back_to_unknown() {
let sample = TailLatencyCompactSample::new(9_000)
.with_ready_queue_depth(3)
.with_retries_total_delay_ns(1_000)
.with_allocator_live_allocations(7);
let event = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"partial-producer-smoke",
"event-missing",
sample,
)
.expect("missing producers should not abort emission")
.expect("enabled emitter should produce an event");
assert_eq!(event.unknown_unmeasured_ns, 8_000);
assert_eq!(
event.fields.get("tail.service.poll_count"),
Some(&TailLatencyFieldValue::Unsigned(0))
);
assert_eq!(
event.fields.get("tail.io_or_network.events_received"),
Some(&TailLatencyFieldValue::Unsigned(0))
);
assert!(
event
.missing_producers
.contains(&"tail.service.poll_count".to_string())
);
assert!(
event
.missing_producers
.contains(&"tail.io_or_network.events_received".to_string())
);
assert_eq!(
event.attribution_states.get("service").map(String::as_str),
Some("missing_producer")
);
assert_eq!(
event.attribution_states.get("unknown").map(String::as_str),
Some("residual")
);
}
#[test]
fn compact_tail_emitter_serialization_is_deterministic() {
let config = TailLatencyEmitterConfig::enabled_core().with_extended_allocator_bytes_live();
let sample = complete_tail_latency_sample();
let first = emit_tail_latency_compact_event(
config,
"deterministic-smoke",
"event-stable",
sample.clone(),
)
.expect("first emission")
.expect("first event");
let second =
emit_tail_latency_compact_event(config, "deterministic-smoke", "event-stable", sample)
.expect("second emission")
.expect("second event");
let first_json = serde_json::to_vec(&first).expect("serialize first");
let second_json = serde_json::to_vec(&second).expect("serialize second");
assert_eq!(first_json, second_json);
assert!(
first
.fields
.contains_key("tail.allocator_or_cache.bytes_live"),
"extended allocator byte pressure should be explicitly gated"
);
}
#[test]
fn compact_tail_emitter_disabled_mode_is_semantics_neutral() {
let event = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::default(),
"",
"",
TailLatencyCompactSample::new(1).with_retries_total_delay_ns(2),
)
.expect("disabled emitter should skip validation");
assert_eq!(event, None);
}
#[test]
fn compact_tail_emitter_rejects_impossible_direct_duration() {
let error = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"bad-direct-duration",
"event-overflow",
TailLatencyCompactSample::new(1_000)
.with_retries_total_delay_ns(750)
.with_synchronization_lock_wait_ns(500),
)
.expect_err("direct durations over total latency must fail closed");
assert_eq!(
error,
TailLatencyEmitError::DirectDurationExceedsTotal {
total_latency_ns: 1_000,
direct_duration_ns: 1_250,
}
);
}
#[test]
fn compact_tail_emitter_extended_fields_are_gated_and_bounded() {
let core_event = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"core-only",
"event-core",
complete_tail_latency_sample(),
)
.expect("core event should emit")
.expect("core event");
let extended_event = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core().with_extended_allocator_bytes_live(),
"extended",
"event-extended",
complete_tail_latency_sample(),
)
.expect("extended event should emit")
.expect("extended event");
assert!(
!core_event
.fields
.contains_key("tail.allocator_or_cache.bytes_live")
);
assert!(
extended_event
.fields
.contains_key("tail.allocator_or_cache.bytes_live")
);
assert!(
core_event.overhead_estimate_bytes <= 2_048,
"compact core overhead estimate should remain bounded: {} bytes",
core_event.overhead_estimate_bytes
);
assert!(
extended_event.overhead_estimate_bytes <= 2_048,
"extended overhead estimate should remain bounded: {} bytes",
extended_event.overhead_estimate_bytes
);
}
fn latency_budget_gate() -> TailLatencyBudgetGate {
TailLatencyBudgetGate::new(20_000, 64, 20_000, 9_000, 10, 500)
}
fn latency_budget_event() -> TailLatencyCompactEvent {
emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"LATENCY-BUDGET-PASS",
"event-0001",
TailLatencyCompactSample::new(18_000)
.with_ready_queue_depth(18)
.with_poll_count(7)
.with_events_received(4)
.with_retries_total_delay_ns(2_000)
.with_synchronization_lock_wait_ns(1_000)
.with_allocator_live_allocations(22),
)
.expect("latency budget event should emit")
.expect("enabled event should be present")
}
fn latency_budget_evidence() -> TailLatencyBudgetEvidence {
TailLatencyBudgetEvidence::new(
"latency-budget-cert-pass",
"LATENCY-BUDGET-PASS",
"candidate-balanced",
"conservative-baseline",
"bash scripts/run_latency_budget_certificate_smoke.sh --execute --scenario LATENCY-BUDGET-PASS",
latency_budget_gate(),
)
.with_sample_window(256, 7)
.with_quantiles(TailLatencyBudgetQuantiles::new(9_000, 13_000, 16_000, 17_000))
.with_uncertainty(TailLatencyBudgetUncertainty::new(250, 750))
.with_regression_window(17_500, 17_000)
.with_tail_events(vec![latency_budget_event()])
}
#[test]
fn latency_budget_certificate_accepts_direct_tail_evidence() {
let certificate = verify_tail_latency_budget_certificate(latency_budget_evidence());
assert_eq!(certificate.verdict, TailLatencyBudgetVerdict::Pass);
assert!(certificate.reason_codes.is_empty());
assert_eq!(
certificate.schema_version,
TAIL_LATENCY_BUDGET_CERTIFICATE_SCHEMA_VERSION
);
assert_eq!(
certificate.taxonomy_version,
TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION
);
assert!(certificate.certificate_hash.starts_with("fnv1a64:"));
assert!(
certificate
.term_breakdown
.iter()
.any(|term| term.term_id == "retries"
&& term.measurement_class == "direct_duration"
&& term.direct_duration_ns == 2_000)
);
assert!(
certificate
.term_breakdown
.iter()
.any(|term| term.term_id == "unknown"
&& term.measurement_class == "unknown_bucket"
&& term.observed_value == certificate.unknown_residual_ns)
);
}
#[test]
fn latency_budget_certificate_rejects_missing_terms_and_hidden_unknown() {
let mut event = latency_budget_event();
event.attribution_states.remove("service");
event.fields.remove("tail.unknown.unmeasured_ns");
let certificate = verify_tail_latency_budget_certificate(
latency_budget_evidence().with_tail_events(vec![event]),
);
assert_eq!(certificate.verdict, TailLatencyBudgetVerdict::FailClosed);
assert!(
certificate
.reason_codes
.contains(&"missing_term:service".to_string())
);
assert!(
certificate
.reason_codes
.contains(&"missing_required_field:tail.unknown.unmeasured_ns".to_string())
);
assert!(
certificate
.reason_codes
.contains(&"hidden_unknown_bucket".to_string())
);
}
#[test]
fn latency_budget_certificate_rejects_stale_mean_only_and_proxy_only_evidence() {
let proxy_only = emit_tail_latency_compact_event(
TailLatencyEmitterConfig::enabled_core(),
"LATENCY-BUDGET-PROXY-ONLY",
"event-proxy",
TailLatencyCompactSample::new(1_000)
.with_ready_queue_depth(1)
.with_poll_count(1)
.with_events_received(1)
.with_allocator_live_allocations(1),
)
.expect("proxy-only event should emit")
.expect("enabled event should be present");
let certificate = verify_tail_latency_budget_certificate(
TailLatencyBudgetEvidence::new(
"latency-budget-cert-stale",
"LATENCY-BUDGET-PROXY-ONLY",
"candidate-proxy-only",
"conservative-baseline",
"bash scripts/run_latency_budget_certificate_smoke.sh --execute --scenario LATENCY-BUDGET-PROXY-ONLY",
latency_budget_gate(),
)
.with_sample_window(16, 11)
.with_uncertainty(TailLatencyBudgetUncertainty::new(0, 10))
.with_regression_window(1_000, 900)
.with_tail_events(vec![proxy_only]),
);
assert_eq!(certificate.verdict, TailLatencyBudgetVerdict::FailClosed);
for reason in [
"insufficient_sample_count",
"stale_calibration",
"missing_quantiles_mean_only_evidence",
"proxy_only_green_row",
] {
assert!(
certificate.reason_codes.contains(&reason.to_string()),
"missing reason {reason}: {:?}",
certificate.reason_codes
);
}
}
#[test]
fn latency_budget_certificate_returns_no_win_for_budget_or_regression_miss() {
let certificate = verify_tail_latency_budget_certificate(
latency_budget_evidence()
.with_quantiles(TailLatencyBudgetQuantiles::new(
9_000, 18_000, 20_000, 22_000,
))
.with_uncertainty(TailLatencyBudgetUncertainty::new(500, 1_000))
.with_regression_window(17_500, 21_000),
);
assert_eq!(certificate.verdict, TailLatencyBudgetVerdict::NoWin);
assert!(
certificate
.reason_codes
.contains(&"p999_budget_exceeded".to_string())
);
assert!(
certificate
.reason_codes
.contains(&"asymmetric_regression_gate".to_string())
);
assert_eq!(
certificate.fallback_reason.as_deref(),
Some("p999_budget_exceeded")
);
}
fn wait_task(index: u32) -> TaskId {
TaskId::new_for_test(index, 0)
}
fn wait_region(index: u32) -> RegionId {
RegionId::new_for_test(index, 0)
}
fn wait_obligation(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
fn trapped_deadlock_report() -> DirectionalDeadlockReport {
DirectionalDeadlockReport {
severity: DeadlockSeverity::Critical,
risk_score: 1.0,
cycles: vec![DeadlockCycle {
tasks: vec![wait_task(1), wait_task(2)],
ingress_edges: 0,
egress_edges: 0,
trapped: true,
}],
}
}
fn base_wait_cause_evidence() -> WaitCauseRemediationEvidence {
WaitCauseRemediationEvidence::new(
"wait-cause-report-pass",
"WAIT-CAUSE-PASS",
"bash scripts/run_wait_cause_remediation_smoke.sh --execute --scenario WAIT-CAUSE-PASS",
)
.with_evidence_refs(vec![
"artifacts/runtime_latency_budget_certificate_v1.json".to_string(),
"artifacts/runtime_tail_latency_taxonomy_v1.json".to_string(),
])
}
#[test]
fn wait_cause_report_ranks_deadlock_before_futurelock_and_unknown_wait() {
let report = build_wait_cause_remediation_report(
base_wait_cause_evidence()
.with_deadlock_report(trapped_deadlock_report())
.with_task_waits(vec![
WaitCauseTaskEvidence::new(
wait_task(9),
Some(wait_region(3)),
WaitCauseTaskWaitKind::Unknown,
"opaque await point",
)
.with_wait_age_ns(5_000),
WaitCauseTaskEvidence::new(
wait_task(7),
Some(wait_region(2)),
WaitCauseTaskWaitKind::AwaitingFuture,
"channel receive producer",
)
.with_wait_age_ns(9_000),
]),
);
assert_eq!(report.verdict, WaitCauseRemediationVerdict::Actionable);
assert_eq!(report.findings.len(), 3);
assert_eq!(
report.findings[0].category,
WaitCauseCategory::DeadlockCycle
);
assert_eq!(report.findings[0].severity, WaitCauseSeverity::Critical);
assert_eq!(report.findings[1].category, WaitCauseCategory::Futurelock);
assert_eq!(report.findings[2].category, WaitCauseCategory::UnknownWait);
assert_eq!(report.findings[0].rank, 1);
assert_eq!(report.findings[1].rank, 2);
assert!(report.report_hash.starts_with("fnv1a64:"));
assert!(report.wait_cause_graph_hash.starts_with("fnv1a64:"));
}
#[test]
fn wait_cause_report_surfaces_obligation_metadata_and_safe_actions() {
let report =
build_wait_cause_remediation_report(base_wait_cause_evidence().with_obligation_leaks(
vec![WaitCauseObligationEvidence::new(
wait_obligation(11),
"SendPermit",
Some(wait_task(4)),
wait_region(5),
42_000,
)],
));
assert_eq!(report.verdict, WaitCauseRemediationVerdict::Actionable);
assert_eq!(report.findings.len(), 1);
let finding = &report.findings[0];
assert_eq!(finding.category, WaitCauseCategory::ObligationLeak);
assert_eq!(finding.reason_code, "reserved_obligation_still_held");
assert_eq!(finding.owner_task_id, Some(format!("{:?}", wait_task(4))));
assert_eq!(
finding.owner_region_id,
Some(format!("{:?}", wait_region(5)))
);
assert!(
finding
.safe_actions
.iter()
.any(|action| action.contains("commit or abort"))
);
assert!(
finding
.forbidden_actions
.iter()
.any(|action| action.contains("Do not delete files"))
);
}
#[test]
fn wait_cause_report_refuses_missing_replay_and_wrong_taxonomy() {
let report = build_wait_cause_remediation_report(
WaitCauseRemediationEvidence::new("bad-report", "BAD", "")
.with_tail_taxonomy_version("runtime-tail-latency-taxonomy-v0")
.with_task_waits(vec![WaitCauseTaskEvidence::new(
wait_task(1),
None,
WaitCauseTaskWaitKind::AwaitingFuture,
"producer",
)]),
);
assert_eq!(report.verdict, WaitCauseRemediationVerdict::Refused);
assert_eq!(report.findings.len(), 0);
assert_eq!(
report.refusal_reason.as_deref(),
Some("missing_replay_command")
);
}
#[test]
fn wait_cause_report_investigates_unknown_wait_and_redacts_operator_text() {
let report =
build_wait_cause_remediation_report(base_wait_cause_evidence().with_task_waits(vec![
WaitCauseTaskEvidence::new(
wait_task(3),
Some(wait_region(1)),
WaitCauseTaskWaitKind::Unknown,
"socket /tmp/secret\nhost=internal.example.com\tuser=a@example.com",
)
.with_wait_age_ns(1_000)
.with_wake_pending(true)
.with_details(vec!["raw\ncontrol\tchars".to_string()]),
]));
assert_eq!(report.verdict, WaitCauseRemediationVerdict::Investigate);
assert_eq!(report.findings.len(), 1);
let finding = &report.findings[0];
assert_eq!(finding.category, WaitCauseCategory::UnknownWait);
assert!(!finding.blocked_resource.contains('\n'));
assert!(!finding.blocked_resource.contains('\t'));
assert!(!finding.blocked_resource.contains("/tmp/secret"));
assert!(!finding.blocked_resource.contains("a@example.com"));
assert!(finding.blocked_resource.contains("[redacted-path]"));
assert!(finding.blocked_resource.contains("[redacted-identity]"));
assert!(
report
.safe_actions
.iter()
.any(|action| action.contains("fresh diagnostics snapshot"))
);
}
#[test]
fn diagnostics_debug() {
let state = Arc::new(RuntimeState::new());
let diagnostics = Diagnostics::new(state);
assert!(format!("{diagnostics:?}").contains("Diagnostics"));
}
#[test]
fn introspection_conf_001_diagnostic_query_result_idempotence() {
init_test("introspection_conf_001_diagnostic_query_result_idempotence");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let diagnostics = Diagnostics::new(Arc::new(state));
let explanation1 = diagnostics.explain_region_open(root);
let explanation2 = diagnostics.explain_region_open(root);
let explanation3 = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation1.region_id == explanation2.region_id,
"region_id idempotent",
true,
explanation1.region_id == explanation2.region_id
);
crate::assert_with_log!(
explanation2.region_id == explanation3.region_id,
"region_id triple check",
true,
explanation2.region_id == explanation3.region_id
);
crate::assert_with_log!(
explanation1.reasons.len() == explanation2.reasons.len(),
"reasons count idempotent",
explanation1.reasons.len(),
explanation2.reasons.len()
);
crate::assert_with_log!(
explanation2.reasons.len() == explanation3.reasons.len(),
"reasons count triple check",
explanation2.reasons.len(),
explanation3.reasons.len()
);
let task1 = diagnostics.explain_task_blocked(task_id);
let task2 = diagnostics.explain_task_blocked(task_id);
let task3 = diagnostics.explain_task_blocked(task_id);
crate::assert_with_log!(
task1.task_id == task2.task_id,
"task_id idempotent",
true,
task1.task_id == task2.task_id
);
crate::assert_with_log!(
task2.task_id == task3.task_id,
"task_id triple check",
true,
task2.task_id == task3.task_id
);
let leaks1 = diagnostics.find_leaked_obligations();
let leaks2 = diagnostics.find_leaked_obligations();
let leaks3 = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
leaks1.len() == leaks2.len(),
"leaks count idempotent",
leaks1.len(),
leaks2.len()
);
crate::assert_with_log!(
leaks2.len() == leaks3.len(),
"leaks count triple check",
leaks2.len(),
leaks3.len()
);
crate::test_complete!("introspection_conf_001_diagnostic_query_result_idempotence");
}
#[test]
fn introspection_conf_002_deterministic_ordering_of_diagnostic_results() {
init_test("introspection_conf_002_deterministic_ordering_of_diagnostic_results");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child1 = insert_child_region(&mut state, root);
let child2 = insert_child_region(&mut state, root);
let child3 = insert_child_region(&mut state, root);
let task1 = insert_task(&mut state, child1, TaskState::Running);
let task2 = insert_task(&mut state, child2, TaskState::Running);
let task3 = insert_task(&mut state, child3, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(100)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let _ob1 = insert_obligation(
&mut state,
child1,
task1,
ObligationKind::Ack,
Time::from_millis(10),
);
let _ob3 = insert_obligation(
&mut state,
child3,
task3,
ObligationKind::Lease,
Time::from_millis(30),
);
let _ob2 = insert_obligation(
&mut state,
child2,
task2,
ObligationKind::SendPermit,
Time::from_millis(20),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks_run1 = diagnostics.find_leaked_obligations();
let leaks_run2 = diagnostics.find_leaked_obligations();
let leaks_run3 = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
leaks_run1.len() == 3,
"leak count consistent",
3usize,
leaks_run1.len()
);
for i in 0..3 {
crate::assert_with_log!(
leaks_run1[i].obligation_id == leaks_run2[i].obligation_id,
&format!("obligation_id ordering run1==run2 idx {}", i),
true,
leaks_run1[i].obligation_id == leaks_run2[i].obligation_id
);
crate::assert_with_log!(
leaks_run2[i].obligation_id == leaks_run3[i].obligation_id,
&format!("obligation_id ordering run2==run3 idx {}", i),
true,
leaks_run2[i].obligation_id == leaks_run3[i].obligation_id
);
crate::assert_with_log!(
leaks_run1[i].region_id == leaks_run2[i].region_id,
&format!("region_id ordering run1==run2 idx {}", i),
true,
leaks_run1[i].region_id == leaks_run2[i].region_id
);
}
let explanation1 = diagnostics.explain_region_open(root);
let explanation2 = diagnostics.explain_region_open(root);
crate::assert_with_log!(
explanation1.reasons.len() == explanation2.reasons.len(),
"reason count deterministic",
explanation1.reasons.len(),
explanation2.reasons.len()
);
for i in 0..explanation1.reasons.len().min(explanation2.reasons.len()) {
let reason1_desc = format!("{:?}", explanation1.reasons[i]);
let reason2_desc = format!("{:?}", explanation2.reasons[i]);
crate::assert_with_log!(
reason1_desc == reason2_desc,
&format!("reason ordering idx {}", i),
true,
reason1_desc == reason2_desc
);
}
crate::test_complete!(
"introspection_conf_002_deterministic_ordering_of_diagnostic_results"
);
}
#[test]
fn introspection_conf_003_serialization_roundtrip_correctness() {
init_test("introspection_conf_003_serialization_roundtrip_correctness");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(500)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Ack,
Time::from_millis(100),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
!leaks.is_empty(),
"has leaks for serialization test",
true,
!leaks.is_empty()
);
for leak in &leaks {
crate::assert_with_log!(
leak.obligation_id == ob_id,
"obligation_id preserved",
true,
leak.obligation_id == ob_id
);
crate::assert_with_log!(
!leak.obligation_type.is_empty(),
"obligation_type serializable",
true,
!leak.obligation_type.is_empty()
);
crate::assert_with_log!(
leak.holder_task == Some(task_id),
"holder_task preserved",
true,
leak.holder_task == Some(task_id)
);
crate::assert_with_log!(
leak.region_id == root,
"region_id preserved",
true,
leak.region_id == root
);
crate::assert_with_log!(
leak.age.as_millis() > 0,
"age computed and serializable",
true,
leak.age.as_millis() > 0
);
let debug_repr = format!("{:?}", leak);
crate::assert_with_log!(
debug_repr.contains("ObligationLeak"),
"debug serialization includes type",
true,
debug_repr.contains("ObligationLeak")
);
crate::assert_with_log!(
debug_repr.contains(&leak.obligation_type),
"debug serialization includes obligation_type",
true,
debug_repr.contains(&leak.obligation_type)
);
}
let explanation = diagnostics.explain_region_open(root);
let display_repr = format!("{}", explanation);
crate::assert_with_log!(
display_repr.contains("Region"),
"display contains region marker",
true,
display_repr.contains("Region")
);
crate::assert_with_log!(
!display_repr.is_empty(),
"display produces non-empty output",
true,
!display_repr.is_empty()
);
let task_explanation = diagnostics.explain_task_blocked(task_id);
let task_display = format!("{}", task_explanation);
crate::assert_with_log!(
task_display.contains("Task"),
"task display contains task marker",
true,
task_display.contains("Task")
);
crate::assert_with_log!(
!task_display.is_empty(),
"task display produces non-empty output",
true,
!task_display.is_empty()
);
crate::test_complete!("introspection_conf_003_serialization_roundtrip_correctness");
}
#[test]
fn introspection_conf_004_scheduler_state_accuracy_in_diagnostics() {
init_test("introspection_conf_004_scheduler_state_accuracy_in_diagnostics");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let running_task = insert_task(&mut state, root, TaskState::Running);
let completed_task = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let cancel_task = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::user("test"),
cleanup_budget: Budget::with_deadline_ns(100_000_000), },
);
let running_task_record = state.task_mut(running_task).expect("running task");
running_task_record.total_polls = 5;
running_task_record.last_polled_step = 100;
let notified = running_task_record.wake_state.notify();
crate::assert_with_log!(notified, "wake state notified", true, notified);
let diagnostics = Diagnostics::new(Arc::new(state));
let running_explanation = diagnostics.explain_task_blocked(running_task);
crate::assert_with_log!(
matches!(
running_explanation.block_reason,
BlockReason::AwaitingSchedule
),
"running task shows awaiting schedule",
true,
matches!(
running_explanation.block_reason,
BlockReason::AwaitingSchedule
)
);
let completed_explanation = diagnostics.explain_task_blocked(completed_task);
crate::assert_with_log!(
matches!(completed_explanation.block_reason, BlockReason::Completed),
"completed task shows completed",
true,
matches!(completed_explanation.block_reason, BlockReason::Completed)
);
let cancel_explanation = diagnostics.explain_task_blocked(cancel_task);
let is_cancel_requested = matches!(
cancel_explanation.block_reason,
BlockReason::CancelRequested { .. }
);
crate::assert_with_log!(
is_cancel_requested,
"cancel requested task shows cancel",
true,
is_cancel_requested
);
let region_explanation = diagnostics.explain_region_open(root);
let mut found_running = false;
let found_completed = false;
let mut found_cancel = false;
for reason in ®ion_explanation.reasons {
match reason {
Reason::TaskRunning {
task_id,
poll_count,
..
} if *task_id == running_task => {
found_running = true;
crate::assert_with_log!(
*poll_count == 5,
"poll count accuracy",
5u64,
*poll_count
);
}
Reason::TaskRunning { task_id, .. } if *task_id == cancel_task => {
found_cancel = true;
}
_ => {}
}
}
crate::assert_with_log!(
found_running,
"found running task reason",
true,
found_running
);
crate::assert_with_log!(found_cancel, "found cancel task reason", true, found_cancel);
crate::assert_with_log!(
!found_completed,
"completed task not blocking",
true,
!found_completed
);
crate::test_complete!("introspection_conf_004_scheduler_state_accuracy_in_diagnostics");
}
#[test]
fn introspection_conf_005_cross_method_consistency_of_diagnostic_data() {
init_test("introspection_conf_005_cross_method_consistency_of_diagnostic_data");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(300)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::SendPermit,
Time::from_millis(50),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let region_explanation = diagnostics.explain_region_open(root);
let task_explanation = diagnostics.explain_task_blocked(task_id);
let leaked_obligations = diagnostics.find_leaked_obligations();
let mut task_in_region_reasons = false;
for reason in ®ion_explanation.reasons {
if let Reason::TaskRunning { task_id: id, .. } = reason {
if *id == task_id {
task_in_region_reasons = true;
break;
}
}
}
crate::assert_with_log!(
task_in_region_reasons,
"task appears in region explanation",
true,
task_in_region_reasons
);
crate::assert_with_log!(
task_explanation.task_id == task_id,
"task explanation has correct ID",
true,
task_explanation.task_id == task_id
);
let mut obligation_in_region_reasons = false;
for reason in ®ion_explanation.reasons {
if let Reason::ObligationHeld {
obligation_id: id,
holder_task,
..
} = reason
{
if *id == ob_id && *holder_task == task_id {
obligation_in_region_reasons = true;
break;
}
}
}
crate::assert_with_log!(
obligation_in_region_reasons,
"obligation appears in region explanation",
true,
obligation_in_region_reasons
);
crate::assert_with_log!(
!leaked_obligations.is_empty(),
"obligation appears in leak detection",
true,
!leaked_obligations.is_empty()
);
let found_leak = leaked_obligations
.iter()
.any(|leak| leak.obligation_id == ob_id && leak.holder_task == Some(task_id));
crate::assert_with_log!(
found_leak,
"leak detection finds same obligation",
true,
found_leak
);
for leak in &leaked_obligations {
if leak.obligation_id == ob_id {
crate::assert_with_log!(
leak.age.as_millis() == 250, "age calculation consistent",
250u128,
leak.age.as_millis()
);
}
}
crate::test_complete!("introspection_conf_005_cross_method_consistency_of_diagnostic_data");
}
#[test]
fn introspection_conf_006_temporal_consistency_of_diagnostic_queries() {
init_test("introspection_conf_006_temporal_consistency_of_diagnostic_queries");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_id = insert_task(&mut state, root, TaskState::Running);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
let ob_id = insert_obligation(
&mut state,
root,
task_id,
ObligationKind::Ack,
Time::from_millis(100),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let initial_leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
initial_leaks.len() == 1,
"initial leak count",
1usize,
initial_leaks.len()
);
let initial_age = initial_leaks[0].age.as_millis();
crate::assert_with_log!(
initial_age == 900, "initial age calculation",
900u128,
initial_age
);
clock.advance_to(Time::from_millis(2000));
let later_leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(
later_leaks.len() == 1,
"leak count preserved after clock advance",
1usize,
later_leaks.len()
);
let later_age = later_leaks[0].age.as_millis();
crate::assert_with_log!(
later_age == 1900, "updated age calculation",
1900u128,
later_age
);
crate::assert_with_log!(
later_leaks[0].obligation_id == ob_id,
"obligation ID preserved across time",
true,
later_leaks[0].obligation_id == ob_id
);
let region_explanation = diagnostics.explain_region_open(root);
let mut obligation_reason_found = false;
for reason in ®ion_explanation.reasons {
if let Reason::ObligationHeld {
obligation_id: id, ..
} = reason
{
if *id == ob_id {
obligation_reason_found = true;
break;
}
}
}
crate::assert_with_log!(
obligation_reason_found,
"temporal consistency across methods",
true,
obligation_reason_found
);
crate::test_complete!("introspection_conf_006_temporal_consistency_of_diagnostic_queries");
}
#[test]
fn introspection_conf_007_resource_leak_detection_accuracy() {
init_test("introspection_conf_007_resource_leak_detection_accuracy");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let child1 = insert_child_region(&mut state, root);
let child2 = insert_child_region(&mut state, root);
let task1 = insert_task(&mut state, child1, TaskState::Running);
let task2 = insert_task(&mut state, child2, TaskState::Running);
let completed_task = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let leak1 = insert_obligation(
&mut state,
child1,
task1,
ObligationKind::Ack,
Time::from_millis(100),
);
let leak2 = insert_obligation(
&mut state,
child2,
task2,
ObligationKind::SendPermit,
Time::from_millis(200),
);
let _non_leak = insert_obligation(
&mut state,
root,
completed_task,
ObligationKind::Lease,
Time::from_millis(300),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let leaks = diagnostics.find_leaked_obligations();
crate::assert_with_log!(leaks.len() == 2, "accurate leak count", 2usize, leaks.len());
let leak_ids: Vec<ObligationId> = leaks.iter().map(|l| l.obligation_id).collect();
crate::assert_with_log!(
leak_ids.contains(&leak1),
"detects first leak",
true,
leak_ids.contains(&leak1)
);
crate::assert_with_log!(
leak_ids.contains(&leak2),
"detects second leak",
true,
leak_ids.contains(&leak2)
);
for leak in &leaks {
match leak.obligation_id {
id if id == leak1 => {
crate::assert_with_log!(
leak.age.as_millis() == 900, "leak1 age accuracy",
900u128,
leak.age.as_millis()
);
crate::assert_with_log!(
leak.holder_task == Some(task1),
"leak1 holder accuracy",
true,
leak.holder_task == Some(task1)
);
}
id if id == leak2 => {
crate::assert_with_log!(
leak.age.as_millis() == 800, "leak2 age accuracy",
800u128,
leak.age.as_millis()
);
crate::assert_with_log!(
leak.holder_task == Some(task2),
"leak2 holder accuracy",
true,
leak.holder_task == Some(task2)
);
}
_ => {
crate::assert_with_log!(false, "unexpected leak detected", true, false);
}
}
}
crate::assert_with_log!(
leaks[0].region_id <= leaks[1].region_id,
"leaks sorted by region",
true,
leaks[0].region_id <= leaks[1].region_id
);
crate::test_complete!("introspection_conf_007_resource_leak_detection_accuracy");
}
#[test]
fn introspection_conf_008_deadlock_detection_determinism() {
init_test("introspection_conf_008_deadlock_detection_determinism");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task_a = insert_task(&mut state, root, TaskState::Running);
let task_b = insert_task(&mut state, root, TaskState::Running);
let task_c = insert_task(&mut state, root, TaskState::Running);
state.task_mut(task_a).expect("task A").waiters.push(task_b);
state.task_mut(task_b).expect("task B").waiters.push(task_c);
state.task_mut(task_c).expect("task C").waiters.push(task_a);
let diagnostics = Diagnostics::new(Arc::new(state));
let report1 = diagnostics.analyze_directional_deadlock();
let report2 = diagnostics.analyze_directional_deadlock();
let report3 = diagnostics.analyze_directional_deadlock();
crate::assert_with_log!(
report1.severity == report2.severity,
"severity deterministic run1==run2",
true,
report1.severity == report2.severity
);
crate::assert_with_log!(
report2.severity == report3.severity,
"severity deterministic run2==run3",
true,
report2.severity == report3.severity
);
crate::assert_with_log!(
matches!(report1.severity, DeadlockSeverity::Critical),
"detects critical deadlock",
true,
matches!(report1.severity, DeadlockSeverity::Critical)
);
crate::assert_with_log!(
report1.cycles.len() == report2.cycles.len(),
"cycle count deterministic run1==run2",
report1.cycles.len(),
report2.cycles.len()
);
crate::assert_with_log!(
report2.cycles.len() == report3.cycles.len(),
"cycle count deterministic run2==run3",
report2.cycles.len(),
report3.cycles.len()
);
crate::assert_with_log!(
!report1.cycles.is_empty(),
"detects cycle",
true,
!report1.cycles.is_empty()
);
if !report1.cycles.is_empty() {
let cycle1 = &report1.cycles[0];
let cycle2 = &report2.cycles[0];
let cycle3 = &report3.cycles[0];
crate::assert_with_log!(
cycle1.tasks.len() == cycle2.tasks.len(),
"cycle task count deterministic run1==run2",
cycle1.tasks.len(),
cycle2.tasks.len()
);
crate::assert_with_log!(
cycle2.tasks.len() == cycle3.tasks.len(),
"cycle task count deterministic run2==run3",
cycle2.tasks.len(),
cycle3.tasks.len()
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_a),
"cycle contains task A",
true,
cycle1.tasks.contains(&task_a)
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_b),
"cycle contains task B",
true,
cycle1.tasks.contains(&task_b)
);
crate::assert_with_log!(
cycle1.tasks.contains(&task_c),
"cycle contains task C",
true,
cycle1.tasks.contains(&task_c)
);
crate::assert_with_log!(
cycle1.trapped == cycle2.trapped,
"trapped status deterministic",
true,
cycle1.trapped == cycle2.trapped
);
}
let health1 = diagnostics.analyze_structural_health();
let health2 = diagnostics.analyze_structural_health();
let health_deterministic = match (&health1.classification, &health2.classification) {
(
crate::observability::spectral_health::HealthClassification::Healthy { .. },
crate::observability::spectral_health::HealthClassification::Healthy { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Deadlocked,
crate::observability::spectral_health::HealthClassification::Deadlocked,
) => true,
(
crate::observability::spectral_health::HealthClassification::Degraded { .. },
crate::observability::spectral_health::HealthClassification::Degraded { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Critical { .. },
crate::observability::spectral_health::HealthClassification::Critical { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
) => true,
_ => false,
};
crate::assert_with_log!(
health_deterministic,
"structural health deterministic",
true,
health_deterministic
);
crate::test_complete!("introspection_conf_008_deadlock_detection_determinism");
}
#[test]
fn introspection_conf_009_diagnostic_query_cancel_safety() {
init_test("introspection_conf_009_diagnostic_query_cancel_safety");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, root, TaskState::Running);
let task2 = insert_task(
&mut state,
root,
TaskState::CancelRequested {
reason: CancelReason::user("test"),
cleanup_budget: Budget::with_deadline_ns(100_000_000), },
);
let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(1000)));
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let _ob1 = insert_obligation(
&mut state,
root,
task1,
ObligationKind::Ack,
Time::from_millis(100),
);
let _ob2 = insert_obligation(
&mut state,
root,
task2,
ObligationKind::Lease,
Time::from_millis(200),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let baseline_explanation = diagnostics.explain_region_open(root);
let baseline_task1_explanation = diagnostics.explain_task_blocked(task1);
let baseline_task2_explanation = diagnostics.explain_task_blocked(task2);
let baseline_leaks = diagnostics.find_leaked_obligations();
let baseline_deadlock = diagnostics.analyze_directional_deadlock();
let baseline_health = diagnostics.analyze_structural_health();
let verify_explanation = diagnostics.explain_region_open(root);
let verify_task1_explanation = diagnostics.explain_task_blocked(task1);
let verify_task2_explanation = diagnostics.explain_task_blocked(task2);
let verify_leaks = diagnostics.find_leaked_obligations();
let verify_deadlock = diagnostics.analyze_directional_deadlock();
let verify_health = diagnostics.analyze_structural_health();
crate::assert_with_log!(
baseline_explanation.reasons.len() == verify_explanation.reasons.len(),
"region explanation cancel-safe",
baseline_explanation.reasons.len(),
verify_explanation.reasons.len()
);
let task1_reason_match = match (
&baseline_task1_explanation.block_reason,
&verify_task1_explanation.block_reason,
) {
(BlockReason::AwaitingSchedule, BlockReason::AwaitingSchedule) => true,
(BlockReason::NotStarted, BlockReason::NotStarted) => true,
(a, b) => format!("{:?}", a) == format!("{:?}", b),
};
crate::assert_with_log!(
task1_reason_match,
"task1 explanation cancel-safe",
true,
task1_reason_match
);
let task2_cancel_match = matches!(
(
&baseline_task2_explanation.block_reason,
&verify_task2_explanation.block_reason
),
(
BlockReason::CancelRequested { .. },
BlockReason::CancelRequested { .. }
)
);
crate::assert_with_log!(
task2_cancel_match,
"task2 cancel explanation cancel-safe",
true,
task2_cancel_match
);
crate::assert_with_log!(
baseline_leaks.len() == verify_leaks.len(),
"leak detection cancel-safe",
baseline_leaks.len(),
verify_leaks.len()
);
let deadlock_severity_match = baseline_deadlock.severity == verify_deadlock.severity;
crate::assert_with_log!(
deadlock_severity_match,
"deadlock detection cancel-safe",
true,
deadlock_severity_match
);
let health_class_match = match (
&baseline_health.classification,
&verify_health.classification,
) {
(
crate::observability::spectral_health::HealthClassification::Healthy { .. },
crate::observability::spectral_health::HealthClassification::Healthy { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Deadlocked,
crate::observability::spectral_health::HealthClassification::Deadlocked,
) => true,
(
crate::observability::spectral_health::HealthClassification::Degraded { .. },
crate::observability::spectral_health::HealthClassification::Degraded { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Critical { .. },
crate::observability::spectral_health::HealthClassification::Critical { .. },
) => true,
(
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
crate::observability::spectral_health::HealthClassification::Fragmented { .. },
) => true,
_ => false,
};
crate::assert_with_log!(
health_class_match,
"health analysis cancel-safe",
true,
health_class_match
);
for (baseline_leak, verify_leak) in baseline_leaks.iter().zip(verify_leaks.iter()) {
crate::assert_with_log!(
baseline_leak.obligation_id == verify_leak.obligation_id,
"leak ID cancel-safe",
true,
baseline_leak.obligation_id == verify_leak.obligation_id
);
crate::assert_with_log!(
baseline_leak.age == verify_leak.age,
"leak age cancel-safe",
true,
baseline_leak.age == verify_leak.age
);
}
crate::test_complete!("introspection_conf_009_diagnostic_query_cancel_safety");
}
#[test]
fn introspection_conf_010_runtime_introspection_endpoint_stability() {
init_test("introspection_conf_010_runtime_introspection_endpoint_stability");
let empty_state = Arc::new(RuntimeState::new());
let empty_diagnostics = Diagnostics::new(empty_state);
let empty_root = RegionId::new_for_test(999, 0);
let empty_explanation = empty_diagnostics.explain_region_open(empty_root);
crate::assert_with_log!(
matches!(
empty_explanation.reasons.first(),
Some(Reason::RegionNotFound)
),
"empty runtime handles missing region",
true,
matches!(
empty_explanation.reasons.first(),
Some(Reason::RegionNotFound)
)
);
let empty_task = TaskId::new_for_test(999, 0);
let empty_task_explanation = empty_diagnostics.explain_task_blocked(empty_task);
crate::assert_with_log!(
matches!(
empty_task_explanation.block_reason,
BlockReason::TaskNotFound
),
"empty runtime handles missing task",
true,
matches!(
empty_task_explanation.block_reason,
BlockReason::TaskNotFound
)
);
let empty_leaks = empty_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
empty_leaks.is_empty(),
"empty runtime has no leaks",
true,
empty_leaks.is_empty()
);
let mut minimal_state = RuntimeState::new();
let minimal_root = minimal_state.create_root_region(Budget::INFINITE);
let minimal_diagnostics = Diagnostics::new(Arc::new(minimal_state));
let minimal_explanation = minimal_diagnostics.explain_region_open(minimal_root);
crate::assert_with_log!(
minimal_explanation.region_state.is_some(),
"minimal runtime provides region state",
true,
minimal_explanation.region_state.is_some()
);
let minimal_leaks = minimal_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
minimal_leaks.is_empty(),
"minimal runtime has no leaks",
true,
minimal_leaks.is_empty()
);
let mut complex_state = RuntimeState::new();
let complex_root = complex_state.create_root_region(Budget::INFINITE);
let complex_child1 = insert_child_region(&mut complex_state, complex_root);
let complex_child2 = insert_child_region(&mut complex_state, complex_root);
let _complex_task1 = insert_task(&mut complex_state, complex_child1, TaskState::Running);
let _complex_task2 = insert_task(
&mut complex_state,
complex_child2,
TaskState::Completed(Outcome::Ok(())),
);
let complex_task3 = insert_task(
&mut complex_state,
complex_root,
TaskState::CancelRequested {
reason: CancelReason::user("cleanup"),
cleanup_budget: Budget::with_deadline_ns(200_000_000), },
);
let complex_diagnostics = Diagnostics::new(Arc::new(complex_state));
let complex_explanation = complex_diagnostics.explain_region_open(complex_root);
crate::assert_with_log!(
!complex_explanation.reasons.is_empty(),
"complex runtime provides detailed reasons",
true,
!complex_explanation.reasons.is_empty()
);
let complex_cancel_explanation = complex_diagnostics.explain_task_blocked(complex_task3);
let is_complex_cancel = matches!(
complex_cancel_explanation.block_reason,
BlockReason::CancelRequested { .. }
);
crate::assert_with_log!(
is_complex_cancel,
"complex runtime tracks cancel state",
true,
is_complex_cancel
);
let mut timer_state = RuntimeState::new();
let timer_root = timer_state.create_root_region(Budget::INFINITE);
let timer_task = insert_task(&mut timer_state, timer_root, TaskState::Running);
let virtual_clock = Arc::new(VirtualClock::starting_at(Time::from_millis(5000)));
timer_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(virtual_clock));
let _timer_obligation = insert_obligation(
&mut timer_state,
timer_root,
timer_task,
ObligationKind::Ack,
Time::from_millis(1000),
);
let timer_diagnostics = Diagnostics::new(Arc::new(timer_state));
let timer_leaks = timer_diagnostics.find_leaked_obligations();
crate::assert_with_log!(
!timer_leaks.is_empty(),
"timer-enabled runtime detects leaks",
true,
!timer_leaks.is_empty()
);
if let Some(leak) = timer_leaks.first() {
crate::assert_with_log!(
leak.age.as_millis() == 4000, "timer-enabled runtime calculates age correctly",
4000u128,
leak.age.as_millis()
);
}
let all_deadlock_reports = [
empty_diagnostics.analyze_directional_deadlock(),
minimal_diagnostics.analyze_directional_deadlock(),
complex_diagnostics.analyze_directional_deadlock(),
timer_diagnostics.analyze_directional_deadlock(),
];
for report in &all_deadlock_reports {
crate::assert_with_log!(
report.risk_score >= 0.0 && report.risk_score <= 1.0,
"deadlock risk score in valid range",
true,
report.risk_score >= 0.0 && report.risk_score <= 1.0
);
}
let all_health_reports = [
empty_diagnostics.analyze_structural_health(),
minimal_diagnostics.analyze_structural_health(),
complex_diagnostics.analyze_structural_health(),
timer_diagnostics.analyze_structural_health(),
];
for health in &all_health_reports {
let valid_classification = matches!(
health.classification,
crate::observability::spectral_health::HealthClassification::Healthy { .. }
| crate::observability::spectral_health::HealthClassification::Degraded { .. }
| crate::observability::spectral_health::HealthClassification::Deadlocked
| crate::observability::spectral_health::HealthClassification::Critical { .. }
| crate::observability::spectral_health::HealthClassification::Fragmented { .. }
);
crate::assert_with_log!(
valid_classification,
"health classification is valid",
true,
valid_classification
);
}
crate::test_complete!("introspection_conf_010_runtime_introspection_endpoint_stability");
}
#[test]
fn structured_diagnostic_report_snapshot_v4_with_extended_metrics() {
init_test("structured_diagnostic_report_snapshot_v4_with_extended_metrics");
let sections = vec![
DiagnosticReportV4Section {
label: "task_execution",
status: "degraded",
accounting: DiagnosticResourceAccounting {
total_regions: 12,
open_regions: 8,
total_tasks: 142,
live_tasks: 89,
total_obligations: 23,
leaked_obligations: 2,
},
histograms: vec![DiagnosticMetricHistogram {
name: "task_execution_latency_ms".to_string(),
buckets: vec![
("0-1ms".to_string(), 1247),
("1-5ms".to_string(), 823),
("5-10ms".to_string(), 156),
("10-50ms".to_string(), 89),
("50-100ms".to_string(), 12),
("100ms+".to_string(), 3),
],
total_count: 2330,
percentiles: vec![(50.0, 1.2), (95.0, 8.7), (99.0, 24.1), (99.9, 78.3)],
}],
rendered: "task_execution: Task execution metrics\n latency_p50: 1.2ms\n latency_p99: 24.1ms\n completion_rate: 94.2%",
},
DiagnosticReportV4Section {
label: "region_lifecycle",
status: "critical",
accounting: DiagnosticResourceAccounting {
total_regions: 67,
open_regions: 37,
total_tasks: 245,
live_tasks: 156,
total_obligations: 45,
leaked_obligations: 8,
},
histograms: vec![DiagnosticMetricHistogram {
name: "region_lifecycle_duration_ms".to_string(),
buckets: vec![
("0-10ms".to_string(), 89),
("10-100ms".to_string(), 156),
("100ms-1s".to_string(), 67),
("1s-10s".to_string(), 23),
("10s+".to_string(), 8),
],
total_count: 343,
percentiles: vec![(50.0, 45.2), (95.0, 2100.0), (99.0, 6780.0)],
}],
rendered: "region_lifecycle: Region lifecycle analysis\n duration_p50: 45.2ms\n duration_p99: 6.78s\n long_lived_count: 8",
},
DiagnosticReportV4Section {
label: "obligation_tracking",
status: "passing",
accounting: DiagnosticResourceAccounting {
total_regions: 34,
open_regions: 23,
total_tasks: 178,
live_tasks: 134,
total_obligations: 67,
leaked_obligations: 0,
},
histograms: vec![DiagnosticMetricHistogram {
name: "obligation_hold_time_ms".to_string(),
buckets: vec![
("0-1s".to_string(), 1200),
("1s-10s".to_string(), 45),
("10s-1min".to_string(), 12),
("1min+".to_string(), 3),
],
total_count: 1260,
percentiles: vec![
(50.0, 120.0),
(95.0, 8900.0),
(99.0, 45000.0),
(99.9, 120000.0),
],
}],
rendered: "obligation_tracking: Obligation hold time analysis\n hold_time_p50: 120ms\n hold_time_p99: 45s\n leak_candidates: 3",
},
];
let rendered = render_structured_diagnostic_report_v4(§ions);
assert_diagnostic_report_snapshot(
"observability_diagnostics_structured_report_v4",
&rendered,
);
crate::test_complete!("structured_diagnostic_report_snapshot_v4_with_extended_metrics");
}
#[test]
fn structured_diagnostic_report_v3_schema_golden_snapshot() {
let healthy_accounting = DiagnosticResourceAccounting {
total_regions: 1,
open_regions: 0,
total_tasks: 1,
live_tasks: 0,
total_obligations: 0,
leaked_obligations: 0,
};
let degraded_accounting = DiagnosticResourceAccounting {
total_regions: 3,
open_regions: 2,
total_tasks: 5,
live_tasks: 3,
total_obligations: 2,
leaked_obligations: 1,
};
let error_accounting = DiagnosticResourceAccounting {
total_regions: 2,
open_regions: 2,
total_tasks: 4,
live_tasks: 4,
total_obligations: 3,
leaked_obligations: 3,
};
let sections = vec![
DiagnosticReportV3Section {
label: "healthy_system",
status: "passing",
accounting: healthy_accounting,
rendered: "scenario: healthy_system\ngenerated_at: 2026-04-21T14:30:00Z\n\n[region]\nnone\n\n[task]\nAll tasks completed successfully.\n\n[leaks]\nnone",
},
DiagnosticReportV3Section {
label: "degraded_performance",
status: "degraded",
accounting: degraded_accounting,
rendered: "scenario: degraded_performance\ngenerated_at: 2026-04-21T14:30:01Z\n\n[region]\nRegion RegionId(1:0) has slow drain (2 children pending).\n\n[task]\nTask TaskId(2:1) experiencing high latency (p99: 450ms).\n\n[leaks]\n- ObligationId(1:0) region=RegionId(1:0) holder=Some(TaskId(2:1)) type=Lease age_ms=1500",
},
DiagnosticReportV3Section {
label: "error_state",
status: "critical",
accounting: error_accounting,
rendered: "scenario: error_state\ngenerated_at: 2026-04-21T14:30:02Z\n\n[region]\nRegion RegionId(0:0) has deadlock detected (cycle length: 2).\n\n[task]\nTask TaskId(0:0) blocked: deadlock (waiting on TaskId(1:0)).\n\n[leaks]\n- ObligationId(0:0) region=RegionId(0:0) holder=Some(TaskId(0:0)) type=Ack age_ms=2000\n- ObligationId(1:0) region=RegionId(0:0) holder=Some(TaskId(1:0)) type=Lease age_ms=2100\n- ObligationId(2:0) region=RegionId(1:0) holder=Some(TaskId(2:0)) type=Ack age_ms=1800",
},
];
let rendered = render_structured_diagnostic_report_v3(§ions);
assert_diagnostic_report_snapshot(
"observability_diagnostics_v3_schema_validation",
&rendered,
);
}
#[test]
fn sanitize_cancel_message_strips_crlf_to_space() {
let raw = "user cancelled\r\n[ERROR] FORGED LOG SPLICE";
let sanitized = sanitize_cancel_message(raw);
assert!(
!sanitized.contains('\r') && !sanitized.contains('\n'),
"CR/LF must be stripped, got {sanitized:?}"
);
assert!(sanitized.contains("FORGED LOG SPLICE"));
}
#[test]
fn sanitize_cancel_message_strips_tab_to_space() {
assert_eq!(sanitize_cancel_message("a\tb"), "a b");
}
#[test]
fn sanitize_cancel_message_replaces_other_controls_with_question() {
let raw = "x\x00y\x07z\x1bw\x7fv";
assert_eq!(sanitize_cancel_message(raw), "x?y?z?w?v");
}
#[test]
fn sanitize_cancel_message_preserves_printable_unicode() {
let raw = "deadline reached: ✓ æ¼¢å—";
assert_eq!(sanitize_cancel_message(raw), raw);
}
#[test]
fn sanitize_cancel_message_truncates_at_cap_with_ellipsis() {
let raw = "Y".repeat(500);
let sanitized = sanitize_cancel_message(&raw);
assert!(sanitized.starts_with(&"Y".repeat(MAX_SANITIZED_CANCEL_MESSAGE_LEN)));
assert!(sanitized.ends_with('…'));
assert_eq!(sanitized.len(), MAX_SANITIZED_CANCEL_MESSAGE_LEN + 3);
}
#[test]
fn sanitize_cancel_message_truncates_on_char_boundary_for_multibyte() {
let raw = "æ¼¢".repeat(86);
let sanitized = sanitize_cancel_message(&raw);
assert!(sanitized.ends_with('…'));
assert_eq!(sanitized.len(), 258);
assert!(std::str::from_utf8(sanitized.as_bytes()).is_ok());
}
#[test]
fn cancel_reason_info_from_reason_sanitizes_message() {
let info = CancelReasonInfo::from_reason(
CancelKind::User,
Some("graceful shutdown\n[ALERT] privilege escalation"),
);
let stored = info.message.as_deref().unwrap();
assert!(
!stored.contains('\n'),
"stored message must be sanitized, got {stored:?}"
);
assert!(stored.contains("[ALERT] privilege escalation"));
let display = format!("{info}");
assert!(!display.contains('\n'));
}
#[test]
fn cancel_reason_info_with_none_message_is_passthrough() {
let info = CancelReasonInfo::from_reason(CancelKind::User, None);
assert!(info.message.is_none());
let display = format!("{info}");
assert!(!display.contains('('));
}
fn assert_diagnostic_yaml_snapshot(snapshot_name: &str, value: &Value) {
insta::with_settings!({
snapshot_path => "../../tests/snapshots",
prepend_module_to_snapshot => false,
}, {
insta::assert_yaml_snapshot!(snapshot_name, value);
});
}
#[test]
fn diagnostic_runtime_state_dump_golden() {
init_test("diagnostic_runtime_state_dump_golden");
let mut state = RuntimeState::new();
let root_region = state.create_root_region(Budget::INFINITE);
let task1 = insert_task(&mut state, root_region, TaskState::Running);
let task2 = insert_task(&mut state, root_region, TaskState::Running);
let task3 = insert_task(
&mut state,
root_region,
TaskState::Completed(Outcome::Cancelled(CancelReason::timeout())),
);
let obligation = insert_obligation(
&mut state,
root_region,
task1,
ObligationKind::SendPermit,
Time::from_nanos(10_000),
);
let diagnostics = Diagnostics::new(Arc::new(state));
let region_explanation = diagnostics.explain_region_open(root_region);
let task_explanation = diagnostics.explain_task_blocked(task2);
let leaked_obligations = diagnostics.find_leaked_obligations();
let health_report = diagnostics.analyze_structural_health();
let diagnostic_dump = json!({
"runtime_state": {
"tasks": {
"task1": {
"id": task1.to_string(),
"state": "Running",
"region": root_region.to_string()
},
"task2": {
"id": task2.to_string(),
"state": "Running",
"region": root_region.to_string()
},
"task3": {
"id": task3.to_string(),
"state": "Cancelled",
"region": root_region.to_string()
}
},
"obligations": {
"obligation1": {
"id": obligation.to_string(),
"kind": "SendPermit",
"holder": task1.to_string(),
"region": root_region.to_string()
}
},
"regions": {
"root": {
"id": root_region.to_string(),
"explanation": {
"can_close": region_explanation.reasons.is_empty(),
"reasons_count": region_explanation.reasons.len()
}
}
}
},
"diagnostics": {
"task_blocked": {
"task_id": task2.to_string(),
"has_explanation": !task_explanation.details.is_empty()
|| !task_explanation.recommendations.is_empty()
},
"leaked_obligations": {
"count": leaked_obligations.len()
},
"health": {
"classification": match health_report.classification {
HealthClassification::Healthy { margin } => json!({
"kind": "healthy",
"margin": margin
}),
HealthClassification::Degraded { fiedler, .. } => json!({
"kind": "degraded",
"fiedler": fiedler
}),
HealthClassification::Critical {
fiedler,
approaching_disconnect,
} => json!({
"kind": "critical",
"fiedler": fiedler,
"approaching_disconnect": approaching_disconnect
}),
HealthClassification::Fragmented { components } => json!({
"kind": "fragmented",
"components": components
}),
HealthClassification::Deadlocked => json!({
"kind": "deadlocked"
}),
},
"fiedler_value": health_report.decomposition.fiedler_value,
"bottleneck_count": health_report.bottlenecks.len()
}
},
"meta": {
"test_scenario": "3_tasks_1_obligation_1_cancel",
"snapshot_version": "v1"
}
});
assert_diagnostic_yaml_snapshot("diagnostic_runtime_state_dump_golden", &diagnostic_dump);
}
}