1use crate::console::Console;
22use crate::observability::spectral_health::{
23 SpectralHealthMonitor, SpectralHealthReport, SpectralThresholds,
24};
25use crate::record::ObligationState;
26use crate::record::region::RegionState;
27use crate::record::task::TaskState;
28use crate::runtime::state::RuntimeState;
29use crate::time::TimerDriverHandle;
30use crate::tracing_compat::{debug, trace, warn};
31use crate::types::{CancelKind, ObligationId, RegionId, TaskId, Time};
32use serde::{Deserialize, Serialize};
33use std::collections::BTreeMap;
34use std::fmt;
35use std::sync::Arc;
36
37#[derive(Debug)]
39pub struct Diagnostics {
40 state: Arc<RuntimeState>,
41 console: Option<Console>,
42 spectral_monitor: parking_lot::Mutex<SpectralHealthMonitor>,
43}
44
45impl Diagnostics {
46 #[must_use]
48 pub fn new(state: Arc<RuntimeState>) -> Self {
49 Self {
50 state,
51 console: None,
52 spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
53 SpectralThresholds::default(),
54 )),
55 }
56 }
57
58 #[must_use]
60 pub fn with_console(state: Arc<RuntimeState>, console: Console) -> Self {
61 Self {
62 state,
63 console: Some(console),
64 spectral_monitor: parking_lot::Mutex::new(SpectralHealthMonitor::new(
65 SpectralThresholds::default(),
66 )),
67 }
68 }
69
70 fn now(&self) -> Time {
72 self.state
73 .timer_driver()
74 .map_or(Time::ZERO, TimerDriverHandle::now)
75 }
76
77 fn build_task_wait_graph(&self) -> TaskWaitGraph {
78 let mut task_ids: Vec<TaskId> = self
79 .state
80 .tasks_iter()
81 .filter_map(|(_, task)| (!task.state.is_terminal()).then_some(task.id))
82 .collect();
83 task_ids.sort();
84 let index_by_task: BTreeMap<TaskId, usize> = task_ids
85 .iter()
86 .enumerate()
87 .map(|(i, id)| (*id, i))
88 .collect();
89
90 let mut directed_edges = Vec::new();
91 for (_, task) in self.state.tasks_iter() {
92 if task.state.is_terminal() {
93 continue;
94 }
95 let Some(&target_idx) = index_by_task.get(&task.id) else {
96 continue;
97 };
98 for waiter in &task.waiters {
100 if let Some(&waiter_idx) = index_by_task.get(waiter) {
101 directed_edges.push((waiter_idx, target_idx));
102 }
103 }
104 }
105 directed_edges.sort_unstable();
106 directed_edges.dedup();
107
108 let undirected_edges: Vec<(usize, usize)> = directed_edges
109 .iter()
110 .map(|(u, v)| if u < v { (*u, *v) } else { (*v, *u) })
111 .collect::<std::collections::BTreeSet<_>>()
112 .into_iter()
113 .collect();
114
115 TaskWaitGraph {
116 task_ids,
117 directed_edges,
118 undirected_edges,
119 }
120 }
121
122 #[must_use]
127 pub fn analyze_structural_health(&self) -> SpectralHealthReport {
128 let graph = self.build_task_wait_graph();
129 let adjacency = wait_graph_adjacency(&graph);
130 let mut monitor = self.spectral_monitor.lock();
131 monitor.analyze_with_trapped_cycle(
132 graph.task_ids.len(),
133 &graph.undirected_edges,
134 has_trapped_wait_cycle(&adjacency),
135 )
136 }
137
138 #[must_use]
140 pub fn analyze_directional_deadlock(&self) -> DirectionalDeadlockReport {
141 let graph = self.build_task_wait_graph();
142 if graph.task_ids.is_empty() {
143 return DirectionalDeadlockReport::empty();
144 }
145
146 let adjacency = wait_graph_adjacency(&graph);
147
148 let sccs = strongly_connected_components(&adjacency);
149 let mut components = Vec::new();
150 let mut trapped = 0_u32;
151 let mut cycle_nodes = 0_usize;
152
153 for nodes in sccs {
154 let has_cycle = if nodes.len() > 1 {
155 true
156 } else {
157 let n0 = nodes[0];
158 adjacency[n0].contains(&n0)
159 };
160 if !has_cycle {
161 continue;
162 }
163 cycle_nodes += nodes.len();
164 let mut ingress = 0_u32;
165 let mut egress = 0_u32;
166 for &u in &nodes {
167 for &v in &adjacency[u] {
168 if nodes.binary_search(&v).is_ok() {
169 continue;
170 }
171 egress = egress.saturating_add(1);
172 }
173 }
174 let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
175 for (u, edges) in adjacency.iter().enumerate() {
176 if node_set.contains(&u) {
177 continue;
178 }
179 for &v in edges {
180 if node_set.contains(&v) {
181 ingress = ingress.saturating_add(1);
182 }
183 }
184 }
185 let trapped_component = egress == 0;
186 if trapped_component {
187 trapped = trapped.saturating_add(1);
188 }
189 let mut tasks: Vec<TaskId> = nodes.iter().map(|idx| graph.task_ids[*idx]).collect();
190 tasks.sort();
191 components.push(DeadlockCycle {
192 tasks,
193 ingress_edges: ingress,
194 egress_edges: egress,
195 trapped: trapped_component,
196 });
197 }
198
199 components.sort_by_key(|c| c.tasks.len());
200 components.reverse();
201
202 #[allow(clippy::cast_precision_loss)]
203 let cycle_ratio = if graph.task_ids.is_empty() {
204 0.0
205 } else {
206 cycle_nodes as f64 / graph.task_ids.len() as f64
207 };
208 #[allow(clippy::cast_precision_loss)]
209 let trapped_ratio = if components.is_empty() {
210 0.0
211 } else {
212 f64::from(trapped) / components.len() as f64
213 };
214 let risk_score = 0.6f64
215 .mul_add(trapped_ratio, 0.4 * cycle_ratio)
216 .clamp(0.0, 1.0);
217 let severity = if trapped > 0 {
218 DeadlockSeverity::Critical
219 } else if !components.is_empty() {
220 DeadlockSeverity::Elevated
221 } else {
222 DeadlockSeverity::None
223 };
224
225 DirectionalDeadlockReport {
226 severity,
227 risk_score,
228 cycles: components,
229 }
230 }
231
232 #[must_use]
236 pub fn explain_region_open(&self, region_id: RegionId) -> RegionOpenExplanation {
237 trace!(region_id = ?region_id, "diagnostics: explain_region_open");
238
239 let Some(region) = self.state.region(region_id) else {
240 return RegionOpenExplanation {
241 region_id,
242 region_state: None,
243 reasons: vec![Reason::RegionNotFound],
244 recommendations: vec!["Verify region id is valid".to_string()],
245 };
246 };
247
248 let region_state = region.state();
249 if region_state == RegionState::Closed {
250 return RegionOpenExplanation {
251 region_id,
252 region_state: Some(region_state),
253 reasons: Vec::new(),
254 recommendations: Vec::new(),
255 };
256 }
257
258 let mut reasons = Vec::new();
259
260 let mut child_ids = region.child_ids();
262 child_ids.sort();
263 for child_id in child_ids {
264 if let Some(child) = self.state.region(child_id) {
265 let child_state = child.state();
266 if child_state != RegionState::Closed {
267 reasons.push(Reason::ChildRegionOpen {
268 child_id,
269 child_state,
270 });
271 }
272 }
273 }
274
275 let mut task_ids = region.task_ids();
277 task_ids.sort();
278 for task_id in task_ids {
279 if let Some(task) = self.state.task(task_id) {
280 if !task.state.is_terminal() {
281 reasons.push(Reason::TaskRunning {
282 task_id,
283 task_state: task.state_name().to_string(),
284 poll_count: task.total_polls,
285 });
286 }
287 }
288 }
289
290 let mut held = Vec::new();
292 for (_, ob) in self.state.obligations_iter() {
293 if ob.region == region_id && ob.state == ObligationState::Reserved {
294 held.push((ob.id, ob.holder, ob.kind));
295 }
296 }
297 held.sort_by_key(|(id, _, _)| *id);
298 for (id, holder, kind) in held {
299 reasons.push(Reason::ObligationHeld {
300 obligation_id: id,
301 obligation_type: format!("{kind:?}"),
302 holder_task: holder,
303 });
304 }
305
306 let mut recommendations = Vec::new();
307 if reasons
308 .iter()
309 .any(|r| matches!(r, Reason::ChildRegionOpen { .. }))
310 {
311 recommendations.push("Wait for child regions to close, or cancel them.".to_string());
312 }
313 if reasons
314 .iter()
315 .any(|r| matches!(r, Reason::TaskRunning { .. }))
316 {
317 recommendations
318 .push("Wait for live tasks to complete, or cancel the region.".to_string());
319 }
320 if reasons
321 .iter()
322 .any(|r| matches!(r, Reason::ObligationHeld { .. }))
323 {
324 recommendations
325 .push("Ensure obligations are committed/aborted before closing.".to_string());
326 }
327
328 let deadlock = self.analyze_directional_deadlock();
329 if deadlock.severity != DeadlockSeverity::None {
330 recommendations.push(format!(
331 "Directional deadlock risk {:?} (score {:.3}); inspect cycles and break wait-for loops.",
332 deadlock.severity, deadlock.risk_score
333 ));
334 }
335
336 debug!(
337 region_id = ?region_id,
338 region_state = ?region_state,
339 reason_count = reasons.len(),
340 "diagnostics: region open explanation computed"
341 );
342
343 RegionOpenExplanation {
344 region_id,
345 region_state: Some(region_state),
346 reasons,
347 recommendations,
348 }
349 }
350
351 #[must_use]
353 pub fn explain_task_blocked(&self, task_id: TaskId) -> TaskBlockedExplanation {
354 trace!(task_id = ?task_id, "diagnostics: explain_task_blocked");
355
356 let Some(task) = self.state.task(task_id) else {
357 return TaskBlockedExplanation {
358 task_id,
359 block_reason: BlockReason::TaskNotFound,
360 details: Vec::new(),
361 recommendations: vec!["Verify task id is valid".to_string()],
362 };
363 };
364
365 let mut details = Vec::new();
366 let mut recommendations = Vec::new();
367
368 let block_reason = match &task.state {
369 TaskState::Created => {
370 recommendations.push("Task has not started polling yet.".to_string());
371 BlockReason::NotStarted
372 }
373 TaskState::Running => {
374 if task.wake_state.is_notified() {
376 recommendations
377 .push("Task has a pending wake; it should be scheduled soon.".to_string());
378 BlockReason::AwaitingSchedule
379 } else {
380 recommendations
381 .push("Task appears to be awaiting an async operation.".to_string());
382 BlockReason::AwaitingFuture {
383 description: "unknown await point".to_string(),
384 }
385 }
386 }
387 TaskState::CancelRequested { reason, .. } => {
388 details.push(format!("cancel kind: {}", reason.kind));
389 if let Some(msg) = &reason.message {
390 details.push(format!("message: {msg}"));
391 }
392 recommendations.push("Task is cancelling; wait for drain/finalizers.".to_string());
393 BlockReason::CancelRequested {
394 reason: CancelReasonInfo::from_reason(reason.kind, reason.message),
395 }
396 }
397 TaskState::Cancelling {
398 reason,
399 cleanup_budget,
400 } => {
401 details.push(format!("cancel kind: {}", reason.kind));
402 details.push(format!(
403 "cleanup polls remaining: {}",
404 cleanup_budget.poll_quota
405 ));
406 BlockReason::RunningCleanup {
407 reason: CancelReasonInfo::from_reason(reason.kind, reason.message),
408 polls_remaining: cleanup_budget.poll_quota,
409 }
410 }
411 TaskState::Finalizing {
412 reason,
413 cleanup_budget,
414 } => {
415 details.push(format!("cancel kind: {}", reason.kind));
416 details.push(format!(
417 "cleanup polls remaining: {}",
418 cleanup_budget.poll_quota
419 ));
420 BlockReason::Finalizing {
421 reason: CancelReasonInfo::from_reason(reason.kind, reason.message),
422 polls_remaining: cleanup_budget.poll_quota,
423 }
424 }
425 TaskState::Completed(outcome) => {
426 details.push(format!("outcome: {outcome:?}"));
427 BlockReason::Completed
428 }
429 };
430
431 if !task.waiters.is_empty() {
433 details.push(format!("waiters: {}", task.waiters.len()));
434 }
435
436 TaskBlockedExplanation {
437 task_id,
438 block_reason,
439 details,
440 recommendations,
441 }
442 }
443
444 #[must_use]
448 pub fn find_leaked_obligations(&self) -> Vec<ObligationLeak> {
449 let now = self.now();
450 let mut leaks = Vec::new();
451
452 for (_, ob) in self.state.obligations_iter() {
453 if ob.state == ObligationState::Reserved {
454 let age = std::time::Duration::from_nanos(now.duration_since(ob.reserved_at));
455 leaks.push(ObligationLeak {
456 obligation_id: ob.id,
457 obligation_type: format!("{:?}", ob.kind),
458 holder_task: Some(ob.holder),
459 region_id: ob.region,
460 age,
461 });
462 }
463 }
464
465 leaks.sort_by_key(|l| (l.region_id, l.obligation_id));
467
468 if !leaks.is_empty() {
469 warn!(
470 count = leaks.len(),
471 "diagnostics: potential obligation leaks detected"
472 );
473 }
474
475 leaks
476 }
477}
478
479#[derive(Debug, Clone)]
480struct TaskWaitGraph {
481 task_ids: Vec<TaskId>,
482 directed_edges: Vec<(usize, usize)>,
483 undirected_edges: Vec<(usize, usize)>,
484}
485
486fn wait_graph_adjacency(graph: &TaskWaitGraph) -> Vec<Vec<usize>> {
487 let mut adjacency = vec![Vec::new(); graph.task_ids.len()];
488 for &(u, v) in &graph.directed_edges {
489 if u < adjacency.len() && v < adjacency.len() {
490 adjacency[u].push(v);
491 }
492 }
493 for edges in &mut adjacency {
494 edges.sort_unstable();
495 edges.dedup();
496 }
497 adjacency
498}
499
500fn has_trapped_wait_cycle(adjacency: &[Vec<usize>]) -> bool {
501 for nodes in strongly_connected_components(adjacency) {
502 let has_cycle = if nodes.len() > 1 {
503 true
504 } else {
505 let n0 = nodes[0];
506 adjacency[n0].contains(&n0)
507 };
508 if !has_cycle {
509 continue;
510 }
511
512 let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
513 let has_egress = nodes
514 .iter()
515 .any(|&u| adjacency[u].iter().any(|v| !node_set.contains(v)));
516 if !has_egress {
517 return true;
518 }
519 }
520
521 false
522}
523
524#[derive(Debug, Clone, Copy, PartialEq, Eq)]
526pub enum DeadlockSeverity {
527 None,
529 Elevated,
531 Critical,
533}
534
535#[derive(Debug, Clone)]
537pub struct DeadlockCycle {
538 pub tasks: Vec<TaskId>,
540 pub ingress_edges: u32,
542 pub egress_edges: u32,
544 pub trapped: bool,
546}
547
548#[derive(Debug, Clone)]
550pub struct DirectionalDeadlockReport {
551 pub severity: DeadlockSeverity,
553 pub risk_score: f64,
555 pub cycles: Vec<DeadlockCycle>,
557}
558
559impl DirectionalDeadlockReport {
560 #[must_use]
561 fn empty() -> Self {
562 Self {
563 severity: DeadlockSeverity::None,
564 risk_score: 0.0,
565 cycles: Vec::new(),
566 }
567 }
568}
569
570#[must_use]
572fn strongly_connected_components(adjacency: &[Vec<usize>]) -> Vec<Vec<usize>> {
573 struct Tarjan<'a> {
574 adjacency: &'a [Vec<usize>],
575 index: usize,
576 stack: Vec<usize>,
577 on_stack: Vec<bool>,
578 indices: Vec<Option<usize>>,
579 lowlink: Vec<usize>,
580 sccs: Vec<Vec<usize>>,
581 }
582
583 impl Tarjan<'_> {
584 fn strongconnect(&mut self, v: usize) {
585 self.indices[v] = Some(self.index);
586 self.lowlink[v] = self.index;
587 self.index += 1;
588 self.stack.push(v);
589 self.on_stack[v] = true;
590
591 for &w in &self.adjacency[v] {
592 if self.indices[w].is_none() {
593 self.strongconnect(w);
594 self.lowlink[v] = self.lowlink[v].min(self.lowlink[w]);
595 } else if self.on_stack[w] {
596 self.lowlink[v] = self.lowlink[v].min(self.indices[w].unwrap_or(usize::MAX));
597 }
598 }
599
600 if self.lowlink[v] == self.indices[v].unwrap_or(usize::MAX) {
601 let mut scc = Vec::new();
602 while let Some(w) = self.stack.pop() {
603 self.on_stack[w] = false;
604 scc.push(w);
605 if w == v {
606 break;
607 }
608 }
609 scc.sort_unstable();
610 self.sccs.push(scc);
611 }
612 }
613 }
614
615 let n = adjacency.len();
616 let mut tarjan = Tarjan {
617 adjacency,
618 index: 0,
619 stack: Vec::new(),
620 on_stack: vec![false; n],
621 indices: vec![None; n],
622 lowlink: vec![0; n],
623 sccs: Vec::new(),
624 };
625
626 for v in 0..n {
627 if tarjan.indices[v].is_none() {
628 tarjan.strongconnect(v);
629 }
630 }
631 tarjan.sccs
632}
633
634#[derive(Debug, Clone)]
636pub struct RegionOpenExplanation {
637 pub region_id: RegionId,
639 pub region_state: Option<RegionState>,
641 pub reasons: Vec<Reason>,
643 pub recommendations: Vec<String>,
645}
646
647impl fmt::Display for RegionOpenExplanation {
648 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
649 writeln!(f, "Region {:?} is still open.", self.region_id)?;
650 if let Some(st) = self.region_state {
651 writeln!(f, " state: {st:?}")?;
652 }
653 for r in &self.reasons {
654 writeln!(f, " - {r}")?;
655 }
656 for rec in &self.recommendations {
657 writeln!(f, " -> {rec}")?;
658 }
659 Ok(())
660 }
661}
662
663#[derive(Debug, Clone)]
665pub enum Reason {
666 RegionNotFound,
668 ChildRegionOpen {
670 child_id: RegionId,
672 child_state: RegionState,
674 },
675 TaskRunning {
677 task_id: TaskId,
679 task_state: String,
681 poll_count: u64,
683 },
684 ObligationHeld {
686 obligation_id: ObligationId,
688 obligation_type: String,
690 holder_task: TaskId,
692 },
693}
694
695impl fmt::Display for Reason {
696 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697 match self {
698 Self::RegionNotFound => write!(f, "region not found"),
699 Self::ChildRegionOpen {
700 child_id,
701 child_state,
702 } => write!(f, "child region {child_id:?} still open ({child_state:?})"),
703 Self::TaskRunning {
704 task_id,
705 task_state,
706 poll_count,
707 } => write!(
708 f,
709 "task {task_id:?} still running (state={task_state}, polls={poll_count})"
710 ),
711 Self::ObligationHeld {
712 obligation_id,
713 obligation_type,
714 holder_task,
715 } => write!(
716 f,
717 "obligation {obligation_id:?} held by task {holder_task:?} (type={obligation_type})"
718 ),
719 }
720 }
721}
722
723#[derive(Debug, Clone)]
725pub struct TaskBlockedExplanation {
726 pub task_id: TaskId,
728 pub block_reason: BlockReason,
730 pub details: Vec<String>,
732 pub recommendations: Vec<String>,
734}
735
736impl fmt::Display for TaskBlockedExplanation {
737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738 writeln!(f, "Task {:?} blocked: {}", self.task_id, self.block_reason)?;
739 for d in &self.details {
740 writeln!(f, " - {d}")?;
741 }
742 for rec in &self.recommendations {
743 writeln!(f, " -> {rec}")?;
744 }
745 Ok(())
746 }
747}
748
749#[derive(Debug, Clone)]
751pub enum BlockReason {
752 TaskNotFound,
754 NotStarted,
756 AwaitingSchedule,
758 AwaitingFuture {
760 description: String,
762 },
763 CancelRequested {
765 reason: CancelReasonInfo,
767 },
768 RunningCleanup {
770 reason: CancelReasonInfo,
772 polls_remaining: u32,
774 },
775 Finalizing {
777 reason: CancelReasonInfo,
779 polls_remaining: u32,
781 },
782 Completed,
784}
785
786impl fmt::Display for BlockReason {
787 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788 match self {
789 Self::TaskNotFound => f.write_str("task not found"),
790 Self::NotStarted => f.write_str("not started"),
791 Self::AwaitingSchedule => f.write_str("awaiting schedule"),
792 Self::AwaitingFuture { description } => write!(f, "awaiting future ({description})"),
793 Self::CancelRequested { reason } => write!(f, "cancel requested ({reason})"),
794 Self::RunningCleanup {
795 reason,
796 polls_remaining,
797 } => write!(
798 f,
799 "running cleanup ({reason}, polls_remaining={polls_remaining})"
800 ),
801 Self::Finalizing {
802 reason,
803 polls_remaining,
804 } => write!(
805 f,
806 "finalizing ({reason}, polls_remaining={polls_remaining})"
807 ),
808 Self::Completed => f.write_str("completed"),
809 }
810 }
811}
812
813#[derive(Debug, Clone)]
815pub struct CancellationExplanation {
816 pub kind: CancelKind,
818 pub message: Option<String>,
820 pub propagation_path: Vec<CancellationStep>,
822}
823
824#[derive(Debug, Clone)]
826pub struct CancellationStep {
827 pub region_id: RegionId,
829 pub kind: CancelKind,
831}
832
833#[derive(Debug, Clone)]
835pub struct CancelReasonInfo {
836 pub kind: CancelKind,
838 pub message: Option<String>,
840}
841
842impl CancelReasonInfo {
843 fn from_reason(kind: CancelKind, message: Option<&str>) -> Self {
844 Self {
845 kind,
846 message: message.map(str::to_string),
847 }
848 }
849}
850
851impl fmt::Display for CancelReasonInfo {
852 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853 if let Some(msg) = &self.message {
854 write!(f, "{} ({msg})", self.kind)
855 } else {
856 write!(f, "{}", self.kind)
857 }
858 }
859}
860
861#[derive(Debug, Clone)]
863pub struct ObligationLeak {
864 pub obligation_id: ObligationId,
866 pub obligation_type: String,
868 pub holder_task: Option<TaskId>,
870 pub region_id: RegionId,
872 pub age: std::time::Duration,
874}
875
876pub const ADVANCED_OBSERVABILITY_CONTRACT_VERSION: &str = "doctor-observability-v1";
878pub const ADVANCED_OBSERVABILITY_BASELINE_VERSION: &str = "doctor-logging-v1";
880
881#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
883pub enum AdvancedEventClass {
884 CommandLifecycle,
886 IntegrationReliability,
888 RemediationSafety,
890 ReplayDeterminism,
892 VerificationGovernance,
894}
895
896impl AdvancedEventClass {
897 #[must_use]
899 pub const fn as_str(self) -> &'static str {
900 match self {
901 Self::CommandLifecycle => "command_lifecycle",
902 Self::IntegrationReliability => "integration_reliability",
903 Self::RemediationSafety => "remediation_safety",
904 Self::ReplayDeterminism => "replay_determinism",
905 Self::VerificationGovernance => "verification_governance",
906 }
907 }
908}
909
910#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
912pub enum AdvancedSeverity {
913 Info,
915 Warning,
917 Error,
919 Critical,
921}
922
923impl AdvancedSeverity {
924 #[must_use]
926 pub const fn as_str(self) -> &'static str {
927 match self {
928 Self::Info => "info",
929 Self::Warning => "warning",
930 Self::Error => "error",
931 Self::Critical => "critical",
932 }
933 }
934}
935
936#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
938pub enum TroubleshootingDimension {
939 CancellationPath,
941 ContractCompliance,
943 Determinism,
945 ExternalDependency,
947 OperatorAction,
949 RecoveryPlanning,
951 RuntimeInvariant,
953}
954
955impl TroubleshootingDimension {
956 #[must_use]
958 pub const fn as_str(self) -> &'static str {
959 match self {
960 Self::CancellationPath => "cancellation_path",
961 Self::ContractCompliance => "contract_compliance",
962 Self::Determinism => "determinism",
963 Self::ExternalDependency => "external_dependency",
964 Self::OperatorAction => "operator_action",
965 Self::RecoveryPlanning => "recovery_planning",
966 Self::RuntimeInvariant => "runtime_invariant",
967 }
968 }
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
973pub struct AdvancedEventClassSpec {
974 pub class_id: String,
976 pub description: String,
978}
979
980#[derive(Debug, Clone, PartialEq, Eq)]
982pub struct AdvancedSeveritySpec {
983 pub severity: String,
985 pub meaning: String,
987}
988
989#[derive(Debug, Clone, PartialEq, Eq)]
991pub struct TroubleshootingDimensionSpec {
992 pub dimension: String,
994 pub purpose: String,
996}
997
998#[derive(Debug, Clone, PartialEq, Eq)]
1000pub struct AdvancedObservabilityContract {
1001 pub contract_version: String,
1003 pub baseline_contract_version: String,
1005 pub event_classes: Vec<AdvancedEventClassSpec>,
1007 pub severity_semantics: Vec<AdvancedSeveritySpec>,
1009 pub troubleshooting_dimensions: Vec<TroubleshootingDimensionSpec>,
1011 pub compatibility_notes: Vec<String>,
1013}
1014
1015pub const TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION: &str = "runtime-tail-latency-taxonomy-v1";
1017
1018#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1020pub struct TailLatencyLogFieldSpec {
1021 pub key: String,
1023 pub unit: String,
1025 pub required: bool,
1027 pub meaning: String,
1029}
1030
1031#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1033pub struct TailLatencySignalSpec {
1034 pub signal_id: String,
1036 pub structured_log_key: String,
1038 pub unit: String,
1040 pub producer_kind: String,
1042 pub producer_symbol: String,
1044 pub producer_file: String,
1046 pub measurement_class: String,
1048 pub core: bool,
1050 pub notes: String,
1052}
1053
1054#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1056pub struct TailLatencyTermSpec {
1057 pub term_id: String,
1059 pub description: String,
1061 pub direct_duration_key: String,
1063 pub attribution_state_key: String,
1065 pub signals: Vec<TailLatencySignalSpec>,
1067}
1068
1069#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1071pub struct TailLatencyTaxonomyContract {
1072 pub contract_version: String,
1074 pub equation: String,
1076 pub total_latency_key: String,
1078 pub unknown_bucket_key: String,
1080 pub required_log_fields: Vec<TailLatencyLogFieldSpec>,
1082 pub terms: Vec<TailLatencyTermSpec>,
1084 pub sampling_policy: Vec<String>,
1086 pub compatibility_notes: Vec<String>,
1088}
1089
1090fn tail_latency_log_field(
1091 key: &str,
1092 unit: &str,
1093 required: bool,
1094 meaning: &str,
1095) -> TailLatencyLogFieldSpec {
1096 TailLatencyLogFieldSpec {
1097 key: key.to_string(),
1098 unit: unit.to_string(),
1099 required,
1100 meaning: meaning.to_string(),
1101 }
1102}
1103
1104#[allow(clippy::too_many_arguments)]
1105fn tail_latency_signal(
1106 signal_id: &str,
1107 structured_log_key: &str,
1108 unit: &str,
1109 producer_kind: &str,
1110 producer_symbol: &str,
1111 producer_file: &str,
1112 measurement_class: &str,
1113 core: bool,
1114 notes: &str,
1115) -> TailLatencySignalSpec {
1116 TailLatencySignalSpec {
1117 signal_id: signal_id.to_string(),
1118 structured_log_key: structured_log_key.to_string(),
1119 unit: unit.to_string(),
1120 producer_kind: producer_kind.to_string(),
1121 producer_symbol: producer_symbol.to_string(),
1122 producer_file: producer_file.to_string(),
1123 measurement_class: measurement_class.to_string(),
1124 core,
1125 notes: notes.to_string(),
1126 }
1127}
1128
1129fn queueing_tail_latency_term() -> TailLatencyTermSpec {
1130 TailLatencyTermSpec {
1131 term_id: "queueing".to_string(),
1132 description:
1133 "Backlog before useful work begins, spanning ready queues, waiters, and drain queues."
1134 .to_string(),
1135 direct_duration_key: "tail.queueing.ns".to_string(),
1136 attribution_state_key: "tail.queueing.attribution_state".to_string(),
1137 signals: vec![
1138 tail_latency_signal(
1139 "queueing.ready_queue_depth",
1140 "tail.queueing.ready_queue_depth",
1141 "count",
1142 "snapshot_field",
1143 "asupersync::obligation::lyapunov::StateSnapshot::ready_queue_depth",
1144 "src/obligation/lyapunov.rs",
1145 "proxy_signal",
1146 true,
1147 "Canonical scheduler backlog proxy used by the three-lane decision contract.",
1148 ),
1149 tail_latency_signal(
1150 "queueing.draining_regions",
1151 "tail.queueing.draining_regions",
1152 "count",
1153 "snapshot_field",
1154 "asupersync::obligation::lyapunov::StateSnapshot::draining_regions",
1155 "src/obligation/lyapunov.rs",
1156 "proxy_signal",
1157 true,
1158 "Captures cancellation/finalizer drain backlog that elongates queueing tails.",
1159 ),
1160 tail_latency_signal(
1161 "queueing.bulkhead_queue_depth",
1162 "tail.queueing.bulkhead_queue_depth",
1163 "count",
1164 "stats_struct",
1165 "asupersync::combinator::bulkhead::BulkheadMetrics::queue_depth",
1166 "src/combinator/bulkhead.rs",
1167 "proxy_signal",
1168 false,
1169 "Extended queueing proxy for admission-controlled bulkhead lanes.",
1170 ),
1171 tail_latency_signal(
1172 "queueing.pool_waiters",
1173 "tail.queueing.pool_waiters",
1174 "count",
1175 "stats_struct",
1176 "asupersync::sync::pool::PoolStats::waiters",
1177 "src/sync/pool.rs",
1178 "proxy_signal",
1179 false,
1180 "Extended backlog proxy for pool acquisition queues.",
1181 ),
1182 ],
1183 }
1184}
1185
1186fn service_tail_latency_term() -> TailLatencyTermSpec {
1187 TailLatencyTermSpec {
1188 term_id: "service".to_string(),
1189 description:
1190 "CPU work once the task is scheduled, including poll consumption and budget burn."
1191 .to_string(),
1192 direct_duration_key: "tail.service.ns".to_string(),
1193 attribution_state_key: "tail.service.attribution_state".to_string(),
1194 signals: vec![
1195 tail_latency_signal(
1196 "service.poll_count",
1197 "tail.service.poll_count",
1198 "count",
1199 "snapshot_field",
1200 "asupersync::runtime::state::TaskSnapshot::poll_count",
1201 "src/runtime/state.rs",
1202 "proxy_signal",
1203 true,
1204 "Canonical always-on service proxy derived from task budget consumption.",
1205 ),
1206 tail_latency_signal(
1207 "service.poll_quota_consumed",
1208 "tail.service.poll_quota_consumed",
1209 "quota_units",
1210 "stats_struct",
1211 "asupersync::observability::resource_accounting::ResourceAccountingSnapshot::poll_quota_consumed",
1212 "src/observability/resource_accounting.rs",
1213 "proxy_signal",
1214 true,
1215 "Aggregated service-pressure counter for runtime/test emitters.",
1216 ),
1217 tail_latency_signal(
1218 "service.cost_quota_consumed",
1219 "tail.service.cost_quota_consumed",
1220 "cost_units",
1221 "stats_struct",
1222 "asupersync::observability::resource_accounting::ResourceAccountingSnapshot::cost_quota_consumed",
1223 "src/observability/resource_accounting.rs",
1224 "proxy_signal",
1225 false,
1226 "Extended service-pressure counter for cost-aware workloads.",
1227 ),
1228 ],
1229 }
1230}
1231
1232fn io_or_network_tail_latency_term() -> TailLatencyTermSpec {
1233 TailLatencyTermSpec {
1234 term_id: "io_or_network".to_string(),
1235 description: "Latency spent waiting on or draining reactor/network activity.".to_string(),
1236 direct_duration_key: "tail.io_or_network.ns".to_string(),
1237 attribution_state_key: "tail.io_or_network.attribution_state".to_string(),
1238 signals: vec![
1239 tail_latency_signal(
1240 "io_or_network.events_received",
1241 "tail.io_or_network.events_received",
1242 "count",
1243 "stats_struct",
1244 "asupersync::runtime::io_driver::IoStats::events_received",
1245 "src/runtime/io_driver.rs",
1246 "proxy_signal",
1247 true,
1248 "Canonical always-on I/O/network pressure proxy from the reactor driver.",
1249 ),
1250 tail_latency_signal(
1251 "io_or_network.polls",
1252 "tail.io_or_network.polls",
1253 "count",
1254 "stats_struct",
1255 "asupersync::runtime::io_driver::IoStats::polls",
1256 "src/runtime/io_driver.rs",
1257 "proxy_signal",
1258 false,
1259 "Extended reactor activity proxy for sustained polling pressure.",
1260 ),
1261 tail_latency_signal(
1262 "io_or_network.wakers_dispatched",
1263 "tail.io_or_network.wakers_dispatched",
1264 "count",
1265 "stats_struct",
1266 "asupersync::runtime::io_driver::IoStats::wakers_dispatched",
1267 "src/runtime/io_driver.rs",
1268 "proxy_signal",
1269 false,
1270 "Extended proxy for wake fan-out caused by readiness events.",
1271 ),
1272 ],
1273 }
1274}
1275
1276fn retries_tail_latency_term() -> TailLatencyTermSpec {
1277 TailLatencyTermSpec {
1278 term_id: "retries".to_string(),
1279 description:
1280 "Backoff and reattempt inflation introduced by retry/rate-limit/circuit-breaker control loops."
1281 .to_string(),
1282 direct_duration_key: "tail.retries.ns".to_string(),
1283 attribution_state_key: "tail.retries.attribution_state".to_string(),
1284 signals: vec![
1285 tail_latency_signal(
1286 "retries.total_delay_ns",
1287 "tail.retries.total_delay_ns",
1288 "ns",
1289 "state_field",
1290 "asupersync::combinator::retry::RetryState::total_delay",
1291 "src/combinator/retry.rs",
1292 "direct_duration",
1293 true,
1294 "Direct retry-delay contribution from the retry combinator.",
1295 ),
1296 tail_latency_signal(
1297 "retries.rate_limit_total_wait_ns",
1298 "tail.retries.rate_limit_total_wait_ns",
1299 "ns",
1300 "stats_struct",
1301 "asupersync::combinator::rate_limit::RateLimitMetrics::total_wait_time",
1302 "src/combinator/rate_limit.rs",
1303 "direct_duration",
1304 false,
1305 "Extended direct delay when token-bucket admission defers work.",
1306 ),
1307 tail_latency_signal(
1308 "retries.circuit_rejected_total",
1309 "tail.retries.circuit_rejected_total",
1310 "count",
1311 "stats_struct",
1312 "asupersync::combinator::circuit_breaker::CircuitBreakerMetrics::total_rejected",
1313 "src/combinator/circuit_breaker.rs",
1314 "proxy_signal",
1315 false,
1316 "Extended retry/control-loop pressure proxy when open circuits reject work.",
1317 ),
1318 ],
1319 }
1320}
1321
1322fn synchronization_tail_latency_term() -> TailLatencyTermSpec {
1323 TailLatencyTermSpec {
1324 term_id: "synchronization".to_string(),
1325 description:
1326 "Coordination delay from locks, pools, obligations, and cancellation-aware rendezvous."
1327 .to_string(),
1328 direct_duration_key: "tail.synchronization.ns".to_string(),
1329 attribution_state_key: "tail.synchronization.attribution_state".to_string(),
1330 signals: vec![
1331 tail_latency_signal(
1332 "synchronization.lock_wait_ns",
1333 "tail.synchronization.lock_wait_ns",
1334 "ns",
1335 "stats_struct",
1336 "asupersync::sync::contended_mutex::LockMetricsSnapshot::wait_ns",
1337 "src/sync/contended_mutex.rs",
1338 "direct_duration",
1339 true,
1340 "Canonical direct synchronization delay from contention-instrumented locks.",
1341 ),
1342 tail_latency_signal(
1343 "synchronization.lock_hold_ns",
1344 "tail.synchronization.lock_hold_ns",
1345 "ns",
1346 "stats_struct",
1347 "asupersync::sync::contended_mutex::LockMetricsSnapshot::hold_ns",
1348 "src/sync/contended_mutex.rs",
1349 "proxy_signal",
1350 false,
1351 "Extended proxy for convoying and long critical sections.",
1352 ),
1353 tail_latency_signal(
1354 "synchronization.pool_total_wait_ns",
1355 "tail.synchronization.pool_total_wait_ns",
1356 "ns",
1357 "stats_struct",
1358 "asupersync::sync::pool::PoolStats::total_wait_time",
1359 "src/sync/pool.rs",
1360 "direct_duration",
1361 false,
1362 "Extended direct delay from resource-pool acquisition waits.",
1363 ),
1364 tail_latency_signal(
1365 "synchronization.obligations_pending",
1366 "tail.synchronization.obligations_pending",
1367 "count",
1368 "stats_struct",
1369 "asupersync::observability::resource_accounting::ResourceAccountingSnapshot::obligations_pending",
1370 "src/observability/resource_accounting.rs",
1371 "proxy_signal",
1372 true,
1373 "Captures obligation/cancellation backlog that can extend synchronization tails.",
1374 ),
1375 ],
1376 }
1377}
1378
1379fn allocator_or_cache_tail_latency_term() -> TailLatencyTermSpec {
1380 TailLatencyTermSpec {
1381 term_id: "allocator_or_cache".to_string(),
1382 description:
1383 "Allocator and cache-locality pressure observable from region-heap churn and memory high-water marks."
1384 .to_string(),
1385 direct_duration_key: "tail.allocator_or_cache.ns".to_string(),
1386 attribution_state_key: "tail.allocator_or_cache.attribution_state".to_string(),
1387 signals: vec![
1388 tail_latency_signal(
1389 "allocator_or_cache.live_allocations",
1390 "tail.allocator_or_cache.live_allocations",
1391 "count",
1392 "stats_struct",
1393 "asupersync::runtime::region_heap::HeapStats::live",
1394 "src/runtime/region_heap.rs",
1395 "proxy_signal",
1396 true,
1397 "Canonical allocator-pressure proxy from live region-heap allocations.",
1398 ),
1399 tail_latency_signal(
1400 "allocator_or_cache.bytes_live",
1401 "tail.allocator_or_cache.bytes_live",
1402 "bytes",
1403 "stats_struct",
1404 "asupersync::runtime::region_heap::HeapStats::bytes_live",
1405 "src/runtime/region_heap.rs",
1406 "proxy_signal",
1407 false,
1408 "Extended allocator-pressure proxy for live retained bytes.",
1409 ),
1410 tail_latency_signal(
1411 "allocator_or_cache.heap_bytes_peak",
1412 "tail.allocator_or_cache.heap_bytes_peak",
1413 "bytes",
1414 "stats_struct",
1415 "asupersync::observability::resource_accounting::ResourceAccountingSnapshot::heap_bytes_peak",
1416 "src/observability/resource_accounting.rs",
1417 "proxy_signal",
1418 false,
1419 "Extended region-level memory high-water mark for cache/allocator analysis.",
1420 ),
1421 ],
1422 }
1423}
1424
1425fn unknown_tail_latency_term() -> TailLatencyTermSpec {
1426 TailLatencyTermSpec {
1427 term_id: "unknown".to_string(),
1428 description:
1429 "Residual latency that remains unattributed after measured terms and proxies are accounted for."
1430 .to_string(),
1431 direct_duration_key: "tail.unknown.unmeasured_ns".to_string(),
1432 attribution_state_key: "tail.unknown.attribution_state".to_string(),
1433 signals: vec![tail_latency_signal(
1434 "unknown.unmeasured_ns",
1435 "tail.unknown.unmeasured_ns",
1436 "ns",
1437 "contract_field",
1438 "asupersync::observability::diagnostics::tail_latency_taxonomy_contract",
1439 "src/observability/diagnostics.rs",
1440 "unknown_bucket",
1441 true,
1442 "Must be emitted whenever any term lacks direct attribution so latency does not disappear from evidence bundles.",
1443 )],
1444 }
1445}
1446
1447#[must_use]
1449pub fn tail_latency_taxonomy_contract() -> TailLatencyTaxonomyContract {
1450 TailLatencyTaxonomyContract {
1451 contract_version: TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION.to_string(),
1452 equation: "tail_latency_ns = queueing_ns + service_ns + io_or_network_ns + retries_ns + synchronization_ns + allocator_or_cache_ns + unknown_ns".to_string(),
1453 total_latency_key: "tail.total_latency_ns".to_string(),
1454 unknown_bucket_key: "tail.unknown.unmeasured_ns".to_string(),
1455 required_log_fields: vec![
1456 tail_latency_log_field(
1457 "tail.contract_version",
1458 "schema_id",
1459 true,
1460 "Versioned tail-latency taxonomy contract identifier.",
1461 ),
1462 tail_latency_log_field(
1463 "tail.total_latency_ns",
1464 "ns",
1465 true,
1466 "Observed end-to-end tail latency for the operation under analysis.",
1467 ),
1468 tail_latency_log_field(
1469 "tail.queueing.ready_queue_depth",
1470 "count",
1471 true,
1472 "Always-on queueing proxy based on runnable backlog.",
1473 ),
1474 tail_latency_log_field(
1475 "tail.service.poll_count",
1476 "count",
1477 true,
1478 "Service-side work proxy based on task poll demand.",
1479 ),
1480 tail_latency_log_field(
1481 "tail.io_or_network.events_received",
1482 "count",
1483 true,
1484 "I/O or network pressure proxy based on reactor event volume.",
1485 ),
1486 tail_latency_log_field(
1487 "tail.retries.total_delay_ns",
1488 "ns",
1489 true,
1490 "Direct retry/backoff delay accumulated by retry combinators.",
1491 ),
1492 tail_latency_log_field(
1493 "tail.synchronization.lock_wait_ns",
1494 "ns",
1495 true,
1496 "Direct synchronization delay from contention-instrumented locks.",
1497 ),
1498 tail_latency_log_field(
1499 "tail.allocator_or_cache.live_allocations",
1500 "count",
1501 true,
1502 "Allocator/cache pressure proxy based on live region-heap allocations.",
1503 ),
1504 tail_latency_log_field(
1505 "tail.unknown.unmeasured_ns",
1506 "ns",
1507 true,
1508 "Residual latency that remains unattributed after measured terms and proxies are recorded.",
1509 ),
1510 ],
1511 terms: vec![
1512 queueing_tail_latency_term(),
1513 service_tail_latency_term(),
1514 io_or_network_tail_latency_term(),
1515 retries_tail_latency_term(),
1516 synchronization_tail_latency_term(),
1517 allocator_or_cache_tail_latency_term(),
1518 unknown_tail_latency_term(),
1519 ],
1520 sampling_policy: vec![
1521 "Always emit the required core fields for any tail-latency event, even when extended observability sampling is disabled.".to_string(),
1522 "Extended fields may be sampled or emitted only in replay/forensics modes, but they must retain the stable keys defined here.".to_string(),
1523 "If a direct-duration field is unavailable for a term, preserve proxy signals and roll the residual duration into tail.unknown.unmeasured_ns.".to_string(),
1524 ],
1525 compatibility_notes: vec![
1526 "Structured-log keys are append-only within a contract version; removals or unit changes require a new contract version.".to_string(),
1527 "Proxy signals are not interchangeable with direct-duration fields; emitters must preserve both semantics explicitly.".to_string(),
1528 "Unknown contribution is mandatory whenever attribution is incomplete so downstream controllers never treat missing data as zero.".to_string(),
1529 ],
1530 }
1531}
1532
1533#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1535pub struct BaselineLogEvent<'a> {
1536 pub flow_id: &'a str,
1538 pub event_kind: &'a str,
1540 pub outcome_class: &'a str,
1542}
1543
1544#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1546pub enum AdvancedClassificationConflict {
1547 FlowEventMismatch {
1549 flow_id: String,
1551 event_kind: String,
1553 },
1554 OutcomeEventMismatch {
1556 event_kind: String,
1558 outcome_class: String,
1560 },
1561}
1562
1563#[derive(Debug, Clone, PartialEq, Eq)]
1565pub struct AdvancedLogClassification {
1566 pub event_class: AdvancedEventClass,
1568 pub severity: AdvancedSeverity,
1570 pub dimensions: Vec<TroubleshootingDimension>,
1572 pub narrative: String,
1574 pub recommended_action: String,
1576 pub conflicts: Vec<AdvancedClassificationConflict>,
1578}
1579
1580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1581enum BaselineFlowId {
1582 Execution,
1583 Integration,
1584 Remediation,
1585 Replay,
1586}
1587
1588impl BaselineFlowId {
1589 fn parse(raw: &str) -> Option<Self> {
1590 match raw {
1591 "execution" => Some(Self::Execution),
1592 "integration" => Some(Self::Integration),
1593 "remediation" => Some(Self::Remediation),
1594 "replay" => Some(Self::Replay),
1595 _ => None,
1596 }
1597 }
1598}
1599
1600#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1601enum BaselineEventKind {
1602 CommandComplete,
1603 CommandStart,
1604 IntegrationError,
1605 IntegrationSync,
1606 RemediationApply,
1607 RemediationVerify,
1608 ReplayComplete,
1609 ReplayStart,
1610 VerificationSummary,
1611}
1612
1613impl BaselineEventKind {
1614 fn parse(raw: &str) -> Option<Self> {
1615 match raw {
1616 "command_complete" => Some(Self::CommandComplete),
1617 "command_start" => Some(Self::CommandStart),
1618 "integration_error" => Some(Self::IntegrationError),
1619 "integration_sync" => Some(Self::IntegrationSync),
1620 "remediation_apply" => Some(Self::RemediationApply),
1621 "remediation_verify" => Some(Self::RemediationVerify),
1622 "replay_complete" => Some(Self::ReplayComplete),
1623 "replay_start" => Some(Self::ReplayStart),
1624 "verification_summary" => Some(Self::VerificationSummary),
1625 _ => None,
1626 }
1627 }
1628}
1629
1630#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1631enum BaselineOutcomeClass {
1632 Cancelled,
1633 Failed,
1634 Success,
1635}
1636
1637impl BaselineOutcomeClass {
1638 fn parse(raw: &str) -> Option<Self> {
1639 match raw {
1640 "cancelled" => Some(Self::Cancelled),
1641 "failed" => Some(Self::Failed),
1642 "success" => Some(Self::Success),
1643 _ => None,
1644 }
1645 }
1646}
1647
1648#[must_use]
1650pub fn advanced_observability_contract() -> AdvancedObservabilityContract {
1651 AdvancedObservabilityContract {
1652 contract_version: ADVANCED_OBSERVABILITY_CONTRACT_VERSION.to_string(),
1653 baseline_contract_version: ADVANCED_OBSERVABILITY_BASELINE_VERSION.to_string(),
1654 event_classes: vec![
1655 AdvancedEventClassSpec {
1656 class_id: AdvancedEventClass::CommandLifecycle.as_str().to_string(),
1657 description: "Execution command lifecycle and gate telemetry.".to_string(),
1658 },
1659 AdvancedEventClassSpec {
1660 class_id: AdvancedEventClass::IntegrationReliability
1661 .as_str()
1662 .to_string(),
1663 description: "Cross-system integration health and boundary reliability."
1664 .to_string(),
1665 },
1666 AdvancedEventClassSpec {
1667 class_id: AdvancedEventClass::RemediationSafety.as_str().to_string(),
1668 description: "Remediation safety, application, and post-fix verification."
1669 .to_string(),
1670 },
1671 AdvancedEventClassSpec {
1672 class_id: AdvancedEventClass::ReplayDeterminism.as_str().to_string(),
1673 description: "Replay lifecycle and deterministic reproducibility.".to_string(),
1674 },
1675 AdvancedEventClassSpec {
1676 class_id: AdvancedEventClass::VerificationGovernance
1677 .as_str()
1678 .to_string(),
1679 description: "Verification summary and governance gate posture.".to_string(),
1680 },
1681 ],
1682 severity_semantics: vec![
1683 AdvancedSeveritySpec {
1684 severity: AdvancedSeverity::Critical.as_str().to_string(),
1685 meaning: "Contract/taxonomy contradiction requiring immediate correction."
1686 .to_string(),
1687 },
1688 AdvancedSeveritySpec {
1689 severity: AdvancedSeverity::Error.as_str().to_string(),
1690 meaning: "Actionable failure impacting reliability or correctness.".to_string(),
1691 },
1692 AdvancedSeveritySpec {
1693 severity: AdvancedSeverity::Info.as_str().to_string(),
1694 meaning: "Expected state transition with no direct intervention required."
1695 .to_string(),
1696 },
1697 AdvancedSeveritySpec {
1698 severity: AdvancedSeverity::Warning.as_str().to_string(),
1699 meaning: "Non-terminal issue or cancellation requiring review.".to_string(),
1700 },
1701 ],
1702 troubleshooting_dimensions: vec![
1703 TroubleshootingDimensionSpec {
1704 dimension: TroubleshootingDimension::CancellationPath
1705 .as_str()
1706 .to_string(),
1707 purpose: "Track request/drain/finalize behavior for cancelled runs.".to_string(),
1708 },
1709 TroubleshootingDimensionSpec {
1710 dimension: TroubleshootingDimension::ContractCompliance
1711 .as_str()
1712 .to_string(),
1713 purpose: "Validate schema, gate, and policy conformance.".to_string(),
1714 },
1715 TroubleshootingDimensionSpec {
1716 dimension: TroubleshootingDimension::Determinism.as_str().to_string(),
1717 purpose: "Confirm replay stability and deterministic artifact lineage.".to_string(),
1718 },
1719 TroubleshootingDimensionSpec {
1720 dimension: TroubleshootingDimension::ExternalDependency
1721 .as_str()
1722 .to_string(),
1723 purpose: "Isolate third-party/system boundary failures.".to_string(),
1724 },
1725 TroubleshootingDimensionSpec {
1726 dimension: TroubleshootingDimension::OperatorAction
1727 .as_str()
1728 .to_string(),
1729 purpose: "Prioritize immediate operator decision paths.".to_string(),
1730 },
1731 TroubleshootingDimensionSpec {
1732 dimension: TroubleshootingDimension::RecoveryPlanning
1733 .as_str()
1734 .to_string(),
1735 purpose: "Drive remediation and verify-after-change sequencing.".to_string(),
1736 },
1737 TroubleshootingDimensionSpec {
1738 dimension: TroubleshootingDimension::RuntimeInvariant
1739 .as_str()
1740 .to_string(),
1741 purpose: "Connect events to runtime invariant health.".to_string(),
1742 },
1743 ],
1744 compatibility_notes: vec![
1745 "Additive dimensions/classes may be introduced without baseline schema changes."
1746 .to_string(),
1747 "Field removals or semantic redefinitions require a contract-version bump.".to_string(),
1748 "Unknown baseline flow/event/outcome values are hard validation errors.".to_string(),
1749 ],
1750 }
1751}
1752
1753pub fn classify_baseline_log_event(
1760 event: BaselineLogEvent<'_>,
1761) -> Result<AdvancedLogClassification, String> {
1762 let flow = BaselineFlowId::parse(event.flow_id)
1763 .ok_or_else(|| format!("unknown flow_id {}", event.flow_id))?;
1764 let kind = BaselineEventKind::parse(event.event_kind)
1765 .ok_or_else(|| format!("unknown event_kind {}", event.event_kind))?;
1766 let outcome = BaselineOutcomeClass::parse(event.outcome_class)
1767 .ok_or_else(|| format!("unknown outcome_class {}", event.outcome_class))?;
1768
1769 let (event_class, mut dimensions, kind_narrative, action_hint) = kind_semantics(kind);
1770 let mut conflicts = Vec::new();
1771 let mut severity = match outcome {
1772 BaselineOutcomeClass::Success => AdvancedSeverity::Info,
1773 BaselineOutcomeClass::Cancelled => AdvancedSeverity::Warning,
1774 BaselineOutcomeClass::Failed => AdvancedSeverity::Error,
1775 };
1776
1777 if !flow_allows_event(flow, kind) {
1778 conflicts.push(AdvancedClassificationConflict::FlowEventMismatch {
1779 flow_id: event.flow_id.to_string(),
1780 event_kind: event.event_kind.to_string(),
1781 });
1782 severity = AdvancedSeverity::Critical;
1783 dimensions.push(TroubleshootingDimension::ContractCompliance);
1784 }
1785
1786 if kind == BaselineEventKind::IntegrationError && outcome == BaselineOutcomeClass::Success {
1787 conflicts.push(AdvancedClassificationConflict::OutcomeEventMismatch {
1788 event_kind: event.event_kind.to_string(),
1789 outcome_class: event.outcome_class.to_string(),
1790 });
1791 severity = severity.max(AdvancedSeverity::Error);
1792 dimensions.push(TroubleshootingDimension::ContractCompliance);
1793 }
1794
1795 if outcome == BaselineOutcomeClass::Cancelled {
1796 dimensions.push(TroubleshootingDimension::CancellationPath);
1797 }
1798 if outcome == BaselineOutcomeClass::Failed {
1799 dimensions.push(TroubleshootingDimension::RecoveryPlanning);
1800 }
1801 dimensions.sort_unstable();
1802 dimensions.dedup();
1803 conflicts.sort();
1804
1805 let outcome_phrase = match outcome {
1806 BaselineOutcomeClass::Success => "completed successfully",
1807 BaselineOutcomeClass::Cancelled => "was cancelled",
1808 BaselineOutcomeClass::Failed => "failed",
1809 };
1810
1811 Ok(AdvancedLogClassification {
1812 event_class,
1813 severity,
1814 dimensions,
1815 narrative: format!(
1816 "{}:{} {}. {}",
1817 event.flow_id, event.event_kind, outcome_phrase, kind_narrative
1818 ),
1819 recommended_action: if conflicts.is_empty() {
1820 action_hint.to_string()
1821 } else {
1822 format!(
1823 "{action_hint} Resolve taxonomy conflicts before trusting downstream automation."
1824 )
1825 },
1826 conflicts,
1827 })
1828}
1829
1830pub fn classify_baseline_log_events(
1832 events: &[BaselineLogEvent<'_>],
1833) -> Result<Vec<AdvancedLogClassification>, String> {
1834 events
1835 .iter()
1836 .map(|event| classify_baseline_log_event(*event))
1837 .collect()
1838}
1839
1840fn flow_allows_event(flow: BaselineFlowId, kind: BaselineEventKind) -> bool {
1841 match flow {
1842 BaselineFlowId::Execution => matches!(
1843 kind,
1844 BaselineEventKind::CommandComplete
1845 | BaselineEventKind::CommandStart
1846 | BaselineEventKind::VerificationSummary
1847 ),
1848 BaselineFlowId::Integration => matches!(
1849 kind,
1850 BaselineEventKind::IntegrationError
1851 | BaselineEventKind::IntegrationSync
1852 | BaselineEventKind::VerificationSummary
1853 ),
1854 BaselineFlowId::Remediation => matches!(
1855 kind,
1856 BaselineEventKind::RemediationApply
1857 | BaselineEventKind::RemediationVerify
1858 | BaselineEventKind::VerificationSummary
1859 ),
1860 BaselineFlowId::Replay => matches!(
1861 kind,
1862 BaselineEventKind::ReplayComplete
1863 | BaselineEventKind::ReplayStart
1864 | BaselineEventKind::VerificationSummary
1865 ),
1866 }
1867}
1868
1869fn kind_semantics(
1870 kind: BaselineEventKind,
1871) -> (
1872 AdvancedEventClass,
1873 Vec<TroubleshootingDimension>,
1874 &'static str,
1875 &'static str,
1876) {
1877 match kind {
1878 BaselineEventKind::CommandComplete => (
1879 AdvancedEventClass::CommandLifecycle,
1880 vec![
1881 TroubleshootingDimension::ContractCompliance,
1882 TroubleshootingDimension::OperatorAction,
1883 ],
1884 "Execution gate completed and emitted a deterministic artifact pointer",
1885 "Review gate summary and continue pipeline progression.",
1886 ),
1887 BaselineEventKind::CommandStart => (
1888 AdvancedEventClass::CommandLifecycle,
1889 vec![TroubleshootingDimension::OperatorAction],
1890 "Execution gate started with reproducible command provenance",
1891 "Monitor for completion and verify emitted command provenance.",
1892 ),
1893 BaselineEventKind::IntegrationError => (
1894 AdvancedEventClass::IntegrationReliability,
1895 vec![
1896 TroubleshootingDimension::ExternalDependency,
1897 TroubleshootingDimension::OperatorAction,
1898 ],
1899 "Integration boundary reported an error at an external/system edge",
1900 "Inspect integration target, retry posture, and boundary adapter diagnostics.",
1901 ),
1902 BaselineEventKind::IntegrationSync => (
1903 AdvancedEventClass::IntegrationReliability,
1904 vec![TroubleshootingDimension::ExternalDependency],
1905 "Integration synchronization event captured adapter boundary state",
1906 "Verify upstream/downstream contract alignment for this sync point.",
1907 ),
1908 BaselineEventKind::RemediationApply => (
1909 AdvancedEventClass::RemediationSafety,
1910 vec![
1911 TroubleshootingDimension::ContractCompliance,
1912 TroubleshootingDimension::RecoveryPlanning,
1913 ],
1914 "Remediation apply phase executed against diagnosed findings",
1915 "Confirm changes are scoped and queue remediation verification.",
1916 ),
1917 BaselineEventKind::RemediationVerify => (
1918 AdvancedEventClass::RemediationSafety,
1919 vec![
1920 TroubleshootingDimension::ContractCompliance,
1921 TroubleshootingDimension::RecoveryPlanning,
1922 ],
1923 "Post-remediation verification assessed health deltas and invariants",
1924 "Evaluate health delta and close or reopen remediation loops.",
1925 ),
1926 BaselineEventKind::ReplayComplete => (
1927 AdvancedEventClass::ReplayDeterminism,
1928 vec![
1929 TroubleshootingDimension::Determinism,
1930 TroubleshootingDimension::RuntimeInvariant,
1931 ],
1932 "Replay completion captured deterministic scenario convergence status",
1933 "Compare replay artifacts against baseline and investigate divergence.",
1934 ),
1935 BaselineEventKind::ReplayStart => (
1936 AdvancedEventClass::ReplayDeterminism,
1937 vec![TroubleshootingDimension::Determinism],
1938 "Replay start established deterministic execution context",
1939 "Track replay progress and preserve trace/evidence join keys.",
1940 ),
1941 BaselineEventKind::VerificationSummary => (
1942 AdvancedEventClass::VerificationGovernance,
1943 vec![
1944 TroubleshootingDimension::ContractCompliance,
1945 TroubleshootingDimension::Determinism,
1946 TroubleshootingDimension::RuntimeInvariant,
1947 ],
1948 "Verification summary synthesized gate outcomes for governance review",
1949 "Use summary to decide promotion, rollback, or targeted investigation.",
1950 ),
1951 }
1952}
1953
1954#[cfg(test)]
1955#[allow(clippy::arc_with_non_send_sync)]
1956mod tests {
1957 use super::*;
1958 use crate::record::obligation::{ObligationKind, ObligationRecord};
1959 use crate::record::region::RegionRecord;
1960 use crate::record::task::{TaskRecord, TaskState};
1961 use crate::time::{TimerDriverHandle, VirtualClock};
1962 use crate::types::{Budget, CancelReason, Outcome};
1963 use crate::util::ArenaIndex;
1964 use std::sync::Arc;
1965
1966 fn init_test(name: &str) {
1967 crate::test_utils::init_test_logging();
1968 crate::test_phase!(name);
1969 }
1970
1971 fn insert_child_region(state: &mut RuntimeState, parent: RegionId) -> RegionId {
1972 let idx = state.regions.insert(RegionRecord::new(
1973 RegionId::from_arena(ArenaIndex::new(0, 0)),
1974 Some(parent),
1975 Budget::INFINITE,
1976 ));
1977 let id = RegionId::from_arena(idx);
1978 let record = state.regions.get_mut(idx).expect("child region missing");
1979 record.id = id;
1980 let added = state
1981 .regions
1982 .get(parent.arena_index())
1983 .expect("parent missing")
1984 .add_child(id);
1985 crate::assert_with_log!(added.is_ok(), "child added", true, added.is_ok());
1986 id
1987 }
1988
1989 fn insert_task(state: &mut RuntimeState, region: RegionId, task_state: TaskState) -> TaskId {
1990 let idx = state.insert_task(TaskRecord::new(
1991 TaskId::from_arena(ArenaIndex::new(0, 0)),
1992 region,
1993 Budget::INFINITE,
1994 ));
1995 let id = TaskId::from_arena(idx);
1996 let record = state.task_mut(id).expect("task missing");
1997 record.id = id;
1998 record.state = task_state;
1999 let added = state
2000 .regions
2001 .get(region.arena_index())
2002 .expect("region missing")
2003 .add_task(id);
2004 crate::assert_with_log!(added.is_ok(), "task added", true, added.is_ok());
2005 id
2006 }
2007
2008 fn insert_obligation(
2009 state: &mut RuntimeState,
2010 region: RegionId,
2011 holder: TaskId,
2012 kind: ObligationKind,
2013 reserved_at: Time,
2014 ) -> ObligationId {
2015 let idx = state.obligations.insert(ObligationRecord::new(
2016 ObligationId::from_arena(ArenaIndex::new(0, 0)),
2017 kind,
2018 holder,
2019 region,
2020 reserved_at,
2021 ));
2022 let id = ObligationId::from_arena(idx);
2023 let record = state.obligations.get_mut(idx).expect("obligation missing");
2024 record.id = id;
2025 id
2026 }
2027
2028 #[test]
2029 fn test_explain_region_open_unknown_region_returns_reason() {
2030 init_test("test_explain_region_open_unknown_region_returns_reason");
2031 let state = Arc::new(RuntimeState::new());
2032 let diagnostics = Diagnostics::new(state);
2033 let missing = RegionId::new_for_test(99, 0);
2034
2035 let explanation = diagnostics.explain_region_open(missing);
2036 crate::assert_with_log!(
2037 explanation.region_state.is_none(),
2038 "region_state none",
2039 true,
2040 explanation.region_state.is_none()
2041 );
2042 crate::assert_with_log!(
2043 explanation.reasons.len() == 1,
2044 "single reason",
2045 1usize,
2046 explanation.reasons.len()
2047 );
2048 let is_not_found = matches!(explanation.reasons.first(), Some(Reason::RegionNotFound));
2049 crate::assert_with_log!(is_not_found, "region not found reason", true, is_not_found);
2050 let has_recommendation = explanation
2051 .recommendations
2052 .iter()
2053 .any(|rec| rec.contains("Verify region id"));
2054 crate::assert_with_log!(
2055 has_recommendation,
2056 "recommendation present",
2057 true,
2058 has_recommendation
2059 );
2060 crate::test_complete!("test_explain_region_open_unknown_region_returns_reason");
2061 }
2062
2063 #[test]
2064 fn test_explain_region_open_closed_region_has_no_reasons() {
2065 init_test("test_explain_region_open_closed_region_has_no_reasons");
2066 let mut state = RuntimeState::new();
2067 let root = state.create_root_region(Budget::INFINITE);
2068 let region = state.region(root).expect("root missing");
2069 let did_close =
2070 region.begin_close(None) && region.begin_finalize() && region.complete_close();
2071 crate::assert_with_log!(did_close, "region closed", true, did_close);
2072
2073 let diagnostics = Diagnostics::new(Arc::new(state));
2074 let explanation = diagnostics.explain_region_open(root);
2075 crate::assert_with_log!(
2076 explanation.region_state == Some(RegionState::Closed),
2077 "closed state",
2078 true,
2079 explanation.region_state == Some(RegionState::Closed)
2080 );
2081 crate::assert_with_log!(
2082 explanation.reasons.is_empty(),
2083 "no reasons",
2084 true,
2085 explanation.reasons.is_empty()
2086 );
2087 crate::assert_with_log!(
2088 explanation.recommendations.is_empty(),
2089 "no recommendations",
2090 true,
2091 explanation.recommendations.is_empty()
2092 );
2093 crate::test_complete!("test_explain_region_open_closed_region_has_no_reasons");
2094 }
2095
2096 #[test]
2097 fn test_explain_region_open_reports_children_tasks_obligations() {
2098 init_test("test_explain_region_open_reports_children_tasks_obligations");
2099 let mut state = RuntimeState::new();
2100 let root = state.create_root_region(Budget::INFINITE);
2101 let child = insert_child_region(&mut state, root);
2102
2103 let task_id = insert_task(&mut state, root, TaskState::Running);
2104 let task = state.task_mut(task_id).expect("task missing");
2105 task.total_polls = 7;
2106
2107 let obligation_id = insert_obligation(
2108 &mut state,
2109 root,
2110 task_id,
2111 ObligationKind::SendPermit,
2112 Time::from_millis(10),
2113 );
2114
2115 let diagnostics = Diagnostics::new(Arc::new(state));
2116 let explanation = diagnostics.explain_region_open(root);
2117
2118 let mut saw_child = false;
2119 let mut saw_task = false;
2120 let mut saw_obligation = false;
2121 for reason in &explanation.reasons {
2122 match reason {
2123 Reason::ChildRegionOpen { child_id, .. } if *child_id == child => {
2124 saw_child = true;
2125 }
2126 Reason::TaskRunning {
2127 task_id: id,
2128 poll_count,
2129 ..
2130 } if *id == task_id && *poll_count == 7 => {
2131 saw_task = true;
2132 }
2133 Reason::ObligationHeld {
2134 obligation_id: id,
2135 holder_task,
2136 ..
2137 } if *id == obligation_id && *holder_task == task_id => {
2138 saw_obligation = true;
2139 }
2140 _ => {}
2141 }
2142 }
2143 crate::assert_with_log!(saw_child, "child reason", true, saw_child);
2144 crate::assert_with_log!(saw_task, "task reason", true, saw_task);
2145 crate::assert_with_log!(saw_obligation, "obligation reason", true, saw_obligation);
2146
2147 let recs = &explanation.recommendations;
2148 let has_child_rec = recs.iter().any(|r| r.contains("child regions"));
2149 let has_task_rec = recs.iter().any(|r| r.contains("live tasks"));
2150 let has_obligation_rec = recs.iter().any(|r| r.contains("obligations"));
2151 crate::assert_with_log!(has_child_rec, "child rec", true, has_child_rec);
2152 crate::assert_with_log!(has_task_rec, "task rec", true, has_task_rec);
2153 crate::assert_with_log!(
2154 has_obligation_rec,
2155 "obligation rec",
2156 true,
2157 has_obligation_rec
2158 );
2159
2160 let rendered = explanation.to_string();
2161 crate::assert_with_log!(
2162 rendered.contains("child region"),
2163 "display includes child",
2164 true,
2165 rendered.contains("child region")
2166 );
2167 crate::assert_with_log!(
2168 rendered.contains("obligation"),
2169 "display includes obligation",
2170 true,
2171 rendered.contains("obligation")
2172 );
2173 crate::test_complete!("test_explain_region_open_reports_children_tasks_obligations");
2174 }
2175
2176 #[test]
2177 fn test_explain_region_open_nested_child_reports_immediate_child() {
2178 init_test("test_explain_region_open_nested_child_reports_immediate_child");
2179 let mut state = RuntimeState::new();
2180 let root = state.create_root_region(Budget::INFINITE);
2181 let child = insert_child_region(&mut state, root);
2182 let grandchild = insert_child_region(&mut state, child);
2183
2184 let diagnostics = Diagnostics::new(Arc::new(state));
2185 let explanation = diagnostics.explain_region_open(child);
2186
2187 let saw_grandchild = explanation.reasons.iter().any(|reason| {
2188 matches!(
2189 reason,
2190 Reason::ChildRegionOpen { child_id, .. } if *child_id == grandchild
2191 )
2192 });
2193 crate::assert_with_log!(saw_grandchild, "grandchild reason", true, saw_grandchild);
2194 crate::test_complete!("test_explain_region_open_nested_child_reports_immediate_child");
2195 }
2196
2197 #[test]
2198 fn test_explain_task_blocked_running_notified_reports_schedule() {
2199 init_test("test_explain_task_blocked_running_notified_reports_schedule");
2200 let mut state = RuntimeState::new();
2201 let root = state.create_root_region(Budget::INFINITE);
2202 let task_id = insert_task(&mut state, root, TaskState::Running);
2203 let task = state.task_mut(task_id).expect("task missing");
2204 let notified = task.wake_state.notify();
2205 crate::assert_with_log!(notified, "wake notified", true, notified);
2206 task.waiters.push(TaskId::new_for_test(77, 0));
2207
2208 let diagnostics = Diagnostics::new(Arc::new(state));
2209 let explanation = diagnostics.explain_task_blocked(task_id);
2210 crate::assert_with_log!(
2211 matches!(explanation.block_reason, BlockReason::AwaitingSchedule),
2212 "awaiting schedule",
2213 true,
2214 matches!(explanation.block_reason, BlockReason::AwaitingSchedule)
2215 );
2216 let has_waiters = explanation.details.iter().any(|d| d.contains("waiters"));
2217 crate::assert_with_log!(has_waiters, "waiters detail", true, has_waiters);
2218 crate::test_complete!("test_explain_task_blocked_running_notified_reports_schedule");
2219 }
2220
2221 #[test]
2222 fn test_explain_task_blocked_cancel_requested_includes_reason() {
2223 init_test("test_explain_task_blocked_cancel_requested_includes_reason");
2224 let mut state = RuntimeState::new();
2225 let root = state.create_root_region(Budget::INFINITE);
2226 let reason = CancelReason::user("stop");
2227 let cleanup_budget = reason.cleanup_budget();
2228 let task_id = insert_task(
2229 &mut state,
2230 root,
2231 TaskState::CancelRequested {
2232 reason,
2233 cleanup_budget,
2234 },
2235 );
2236
2237 let diagnostics = Diagnostics::new(Arc::new(state));
2238 let explanation = diagnostics.explain_task_blocked(task_id);
2239 let matches_reason = matches!(
2240 explanation.block_reason,
2241 BlockReason::CancelRequested {
2242 reason: CancelReasonInfo {
2243 kind: CancelKind::User,
2244 message: Some(_)
2245 }
2246 }
2247 );
2248 crate::assert_with_log!(matches_reason, "cancel requested", true, matches_reason);
2249 let rendered = explanation.to_string();
2250 crate::assert_with_log!(
2251 rendered.contains("cancel requested"),
2252 "display includes cancel",
2253 true,
2254 rendered.contains("cancel requested")
2255 );
2256 crate::test_complete!("test_explain_task_blocked_cancel_requested_includes_reason");
2257 }
2258
2259 #[test]
2260 fn test_explain_task_blocked_completed_reports_completed() {
2261 init_test("test_explain_task_blocked_completed_reports_completed");
2262 let mut state = RuntimeState::new();
2263 let root = state.create_root_region(Budget::INFINITE);
2264 let task_id = insert_task(&mut state, root, TaskState::Completed(Outcome::Ok(())));
2265
2266 let diagnostics = Diagnostics::new(Arc::new(state));
2267 let explanation = diagnostics.explain_task_blocked(task_id);
2268 crate::assert_with_log!(
2269 matches!(explanation.block_reason, BlockReason::Completed),
2270 "completed",
2271 true,
2272 matches!(explanation.block_reason, BlockReason::Completed)
2273 );
2274 crate::test_complete!("test_explain_task_blocked_completed_reports_completed");
2275 }
2276
2277 #[test]
2278 fn test_find_leaked_obligations_sorted_and_aged() {
2279 init_test("test_find_leaked_obligations_sorted_and_aged");
2280 let mut state = RuntimeState::new();
2281 let root = state.create_root_region(Budget::INFINITE);
2282 let child = insert_child_region(&mut state, root);
2283
2284 let clock = Arc::new(VirtualClock::starting_at(Time::from_millis(100)));
2285 state.set_timer_driver(TimerDriverHandle::with_virtual_clock(Arc::clone(&clock)));
2286
2287 let root_task = insert_task(&mut state, root, TaskState::Running);
2288 let child_task = insert_task(&mut state, child, TaskState::Running);
2289
2290 let root_ob = insert_obligation(
2291 &mut state,
2292 root,
2293 root_task,
2294 ObligationKind::Ack,
2295 Time::from_millis(10),
2296 );
2297 let child_ob = insert_obligation(
2298 &mut state,
2299 child,
2300 child_task,
2301 ObligationKind::Lease,
2302 Time::from_millis(20),
2303 );
2304
2305 let diagnostics = Diagnostics::new(Arc::new(state));
2306 let leaks = diagnostics.find_leaked_obligations();
2307 crate::assert_with_log!(leaks.len() == 2, "two leaks", 2usize, leaks.len());
2308
2309 crate::assert_with_log!(
2310 leaks[0].region_id == root,
2311 "root first",
2312 true,
2313 leaks[0].region_id == root
2314 );
2315 crate::assert_with_log!(
2316 leaks[1].region_id == child,
2317 "child second",
2318 true,
2319 leaks[1].region_id == child
2320 );
2321 crate::assert_with_log!(
2322 leaks[0].obligation_id == root_ob,
2323 "root obligation id",
2324 true,
2325 leaks[0].obligation_id == root_ob
2326 );
2327 crate::assert_with_log!(
2328 leaks[1].obligation_id == child_ob,
2329 "child obligation id",
2330 true,
2331 leaks[1].obligation_id == child_ob
2332 );
2333
2334 let root_age_ms = leaks[0].age.as_millis();
2335 let child_age_ms = leaks[1].age.as_millis();
2336 crate::assert_with_log!(root_age_ms == 90, "root age", 90u128, root_age_ms);
2337 crate::assert_with_log!(child_age_ms == 80, "child age", 80u128, child_age_ms);
2338
2339 crate::test_complete!("test_find_leaked_obligations_sorted_and_aged");
2340 }
2341
2342 #[test]
2345 fn reason_debug_clone() {
2346 let r = Reason::RegionNotFound;
2347 let r2 = r;
2348 assert!(format!("{r2:?}").contains("RegionNotFound"));
2349 }
2350
2351 #[test]
2352 fn reason_display_all_variants() {
2353 let r1 = Reason::RegionNotFound;
2354 assert!(r1.to_string().contains("not found"));
2355
2356 let r2 = Reason::ChildRegionOpen {
2357 child_id: RegionId::new_for_test(1, 0),
2358 child_state: RegionState::Open,
2359 };
2360 assert!(r2.to_string().contains("child region"));
2361
2362 let r3 = Reason::TaskRunning {
2363 task_id: TaskId::new_for_test(1, 0),
2364 task_state: "Running".into(),
2365 poll_count: 5,
2366 };
2367 assert!(r3.to_string().contains("task"));
2368 assert!(r3.to_string().contains("polls=5"));
2369
2370 let r4 = Reason::ObligationHeld {
2371 obligation_id: ObligationId::new_for_test(1, 0),
2372 obligation_type: "Lease".into(),
2373 holder_task: TaskId::new_for_test(2, 0),
2374 };
2375 assert!(r4.to_string().contains("obligation"));
2376 assert!(r4.to_string().contains("Lease"));
2377 }
2378
2379 #[test]
2380 fn region_open_explanation_debug_clone() {
2381 let explanation = RegionOpenExplanation {
2382 region_id: RegionId::new_for_test(1, 0),
2383 region_state: Some(RegionState::Open),
2384 reasons: vec![Reason::RegionNotFound],
2385 recommendations: vec!["check it".into()],
2386 };
2387 let explanation2 = explanation;
2388 assert!(format!("{explanation2:?}").contains("RegionOpenExplanation"));
2389 }
2390
2391 #[test]
2392 fn region_open_explanation_display() {
2393 let explanation = RegionOpenExplanation {
2394 region_id: RegionId::new_for_test(1, 0),
2395 region_state: Some(RegionState::Open),
2396 reasons: vec![Reason::RegionNotFound],
2397 recommendations: vec!["fix it".into()],
2398 };
2399 let s = explanation.to_string();
2400 assert!(s.contains("still open"));
2401 assert!(s.contains("fix it"));
2402 }
2403
2404 #[test]
2405 fn task_blocked_explanation_debug_clone() {
2406 let explanation = TaskBlockedExplanation {
2407 task_id: TaskId::new_for_test(1, 0),
2408 block_reason: BlockReason::NotStarted,
2409 details: vec!["detail".into()],
2410 recommendations: vec!["wait".into()],
2411 };
2412 let explanation2 = explanation;
2413 assert!(format!("{explanation2:?}").contains("TaskBlockedExplanation"));
2414 }
2415
2416 #[test]
2417 fn task_blocked_explanation_display() {
2418 let explanation = TaskBlockedExplanation {
2419 task_id: TaskId::new_for_test(1, 0),
2420 block_reason: BlockReason::AwaitingSchedule,
2421 details: vec!["pending wake".into()],
2422 recommendations: vec!["wait for scheduler".into()],
2423 };
2424 let s = explanation.to_string();
2425 assert!(s.contains("blocked"));
2426 assert!(s.contains("awaiting schedule"));
2427 }
2428
2429 #[test]
2430 fn block_reason_debug_clone() {
2431 let r = BlockReason::TaskNotFound;
2432 let r2 = r;
2433 assert!(format!("{r2:?}").contains("TaskNotFound"));
2434 }
2435
2436 #[test]
2437 fn block_reason_display_all_variants() {
2438 let variants: Vec<BlockReason> = vec![
2439 BlockReason::TaskNotFound,
2440 BlockReason::NotStarted,
2441 BlockReason::AwaitingSchedule,
2442 BlockReason::AwaitingFuture {
2443 description: "channel recv".into(),
2444 },
2445 BlockReason::CancelRequested {
2446 reason: CancelReasonInfo {
2447 kind: CancelKind::User,
2448 message: Some("stop".into()),
2449 },
2450 },
2451 BlockReason::RunningCleanup {
2452 reason: CancelReasonInfo {
2453 kind: CancelKind::User,
2454 message: None,
2455 },
2456 polls_remaining: 10,
2457 },
2458 BlockReason::Finalizing {
2459 reason: CancelReasonInfo {
2460 kind: CancelKind::User,
2461 message: None,
2462 },
2463 polls_remaining: 5,
2464 },
2465 BlockReason::Completed,
2466 ];
2467 for v in &variants {
2468 assert!(!v.to_string().is_empty());
2469 }
2470 }
2471
2472 #[test]
2473 fn cancellation_explanation_debug_clone() {
2474 let explanation = CancellationExplanation {
2475 kind: CancelKind::User,
2476 message: Some("timeout".into()),
2477 propagation_path: vec![CancellationStep {
2478 region_id: RegionId::new_for_test(1, 0),
2479 kind: CancelKind::User,
2480 }],
2481 };
2482 let explanation2 = explanation;
2483 assert!(format!("{explanation2:?}").contains("CancellationExplanation"));
2484 }
2485
2486 #[test]
2487 fn cancellation_step_debug_clone() {
2488 let step = CancellationStep {
2489 region_id: RegionId::new_for_test(1, 0),
2490 kind: CancelKind::User,
2491 };
2492 let step2 = step;
2493 assert!(format!("{step2:?}").contains("CancellationStep"));
2494 }
2495
2496 #[test]
2497 fn cancel_reason_info_debug_clone_display() {
2498 let info = CancelReasonInfo {
2499 kind: CancelKind::User,
2500 message: Some("stop".into()),
2501 };
2502 let info2 = info.clone();
2503 assert!(format!("{info2:?}").contains("CancelReasonInfo"));
2504 let s = info.to_string();
2505 assert!(s.contains("stop"));
2506
2507 let info_no_msg = CancelReasonInfo {
2508 kind: CancelKind::User,
2509 message: None,
2510 };
2511 assert!(!info_no_msg.to_string().is_empty());
2512 }
2513
2514 #[test]
2515 fn obligation_leak_debug_clone() {
2516 let leak = ObligationLeak {
2517 obligation_id: ObligationId::new_for_test(1, 0),
2518 obligation_type: "Ack".into(),
2519 holder_task: Some(TaskId::new_for_test(2, 0)),
2520 region_id: RegionId::new_for_test(1, 0),
2521 age: std::time::Duration::from_mins(1),
2522 };
2523 let leak2 = leak;
2524 assert!(format!("{leak2:?}").contains("ObligationLeak"));
2525 }
2526
2527 #[test]
2528 fn directional_deadlock_cycle_detection_reports_critical() {
2529 let mut state = RuntimeState::new();
2530 let root = state.create_root_region(Budget::INFINITE);
2531 let t1 = insert_task(&mut state, root, TaskState::Running);
2532 let t2 = insert_task(&mut state, root, TaskState::Running);
2533 state.task_mut(t1).expect("t1").waiters.push(t2); state.task_mut(t2).expect("t2").waiters.push(t1); let diagnostics = Diagnostics::new(Arc::new(state));
2537 let report = diagnostics.analyze_directional_deadlock();
2538 assert_eq!(report.severity, DeadlockSeverity::Critical);
2539 assert!(!report.cycles.is_empty());
2540 assert!(report.cycles[0].trapped);
2541 assert!(report.cycles[0].tasks.contains(&t1));
2542 assert!(report.cycles[0].tasks.contains(&t2));
2543 }
2544
2545 #[test]
2546 fn structural_health_reports_deadlocked_for_trapped_cycle() {
2547 let mut state = RuntimeState::new();
2548 let root = state.create_root_region(Budget::INFINITE);
2549 let t1 = insert_task(&mut state, root, TaskState::Running);
2550 let t2 = insert_task(&mut state, root, TaskState::Running);
2551 state.task_mut(t1).expect("t1").waiters.push(t2);
2552 state.task_mut(t2).expect("t2").waiters.push(t1);
2553
2554 let diagnostics = Diagnostics::new(Arc::new(state));
2555 let report = diagnostics.analyze_structural_health();
2556 assert!(matches!(
2557 report.classification,
2558 crate::observability::spectral_health::HealthClassification::Deadlocked
2559 ));
2560 }
2561
2562 #[test]
2563 fn explain_region_open_includes_directional_deadlock_recommendation() {
2564 let mut state = RuntimeState::new();
2565 let root = state.create_root_region(Budget::INFINITE);
2566 let t1 = insert_task(&mut state, root, TaskState::Running);
2567 let t2 = insert_task(&mut state, root, TaskState::Running);
2568 state.task_mut(t1).expect("t1").waiters.push(t2);
2569 state.task_mut(t2).expect("t2").waiters.push(t1);
2570
2571 let diagnostics = Diagnostics::new(Arc::new(state));
2572 let explanation = diagnostics.explain_region_open(root);
2573 assert!(
2574 explanation
2575 .recommendations
2576 .iter()
2577 .any(|r| r.contains("Directional deadlock risk")),
2578 "expected directional deadlock recommendation"
2579 );
2580 }
2581
2582 #[test]
2583 fn advanced_observability_contract_has_sorted_dimensions_and_classes() {
2584 let contract = advanced_observability_contract();
2585
2586 let classes: Vec<&str> = contract
2587 .event_classes
2588 .iter()
2589 .map(|item| item.class_id.as_str())
2590 .collect();
2591 let mut sorted_classes = classes.clone();
2592 sorted_classes.sort_unstable();
2593 sorted_classes.dedup();
2594 assert_eq!(classes, sorted_classes);
2595
2596 let dimensions: Vec<&str> = contract
2597 .troubleshooting_dimensions
2598 .iter()
2599 .map(|item| item.dimension.as_str())
2600 .collect();
2601 let mut sorted_dimensions = dimensions.clone();
2602 sorted_dimensions.sort_unstable();
2603 sorted_dimensions.dedup();
2604 assert_eq!(dimensions, sorted_dimensions);
2605 }
2606
2607 #[test]
2608 fn classify_baseline_log_event_maps_known_event() {
2609 let classified = classify_baseline_log_event(BaselineLogEvent {
2610 flow_id: "execution",
2611 event_kind: "command_start",
2612 outcome_class: "success",
2613 })
2614 .expect("classification should succeed");
2615
2616 assert_eq!(classified.event_class, AdvancedEventClass::CommandLifecycle);
2617 assert_eq!(classified.severity, AdvancedSeverity::Info);
2618 assert!(classified.conflicts.is_empty());
2619 assert!(
2620 classified
2621 .dimensions
2622 .contains(&TroubleshootingDimension::OperatorAction)
2623 );
2624 }
2625
2626 #[test]
2627 fn classify_baseline_log_event_detects_flow_event_conflict() {
2628 let classified = classify_baseline_log_event(BaselineLogEvent {
2629 flow_id: "execution",
2630 event_kind: "integration_sync",
2631 outcome_class: "success",
2632 })
2633 .expect("classification should succeed with conflict");
2634
2635 assert_eq!(classified.severity, AdvancedSeverity::Critical);
2636 assert!(classified.conflicts.iter().any(|conflict| matches!(
2637 conflict,
2638 AdvancedClassificationConflict::FlowEventMismatch { .. }
2639 )));
2640 }
2641
2642 #[test]
2643 fn classify_baseline_log_event_detects_outcome_event_conflict() {
2644 let classified = classify_baseline_log_event(BaselineLogEvent {
2645 flow_id: "integration",
2646 event_kind: "integration_error",
2647 outcome_class: "success",
2648 })
2649 .expect("classification should succeed with conflict");
2650
2651 assert_eq!(
2652 classified.event_class,
2653 AdvancedEventClass::IntegrationReliability
2654 );
2655 assert_eq!(classified.severity, AdvancedSeverity::Error);
2656 assert!(classified.conflicts.iter().any(|conflict| matches!(
2657 conflict,
2658 AdvancedClassificationConflict::OutcomeEventMismatch { .. }
2659 )));
2660 }
2661
2662 #[test]
2663 fn classify_baseline_log_events_is_deterministic() {
2664 let stream = vec![
2665 BaselineLogEvent {
2666 flow_id: "execution",
2667 event_kind: "command_start",
2668 outcome_class: "success",
2669 },
2670 BaselineLogEvent {
2671 flow_id: "execution",
2672 event_kind: "verification_summary",
2673 outcome_class: "failed",
2674 },
2675 BaselineLogEvent {
2676 flow_id: "replay",
2677 event_kind: "replay_complete",
2678 outcome_class: "cancelled",
2679 },
2680 ];
2681
2682 let a = classify_baseline_log_events(&stream).expect("stream classification should pass");
2683 let b = classify_baseline_log_events(&stream).expect("stream classification should pass");
2684 assert_eq!(a, b);
2685 assert!(!a.is_empty());
2686 assert!(a.iter().all(|entry| !entry.narrative.is_empty()));
2687 }
2688
2689 #[test]
2690 fn classify_baseline_log_event_rejects_unknown_tokens() {
2691 let err = classify_baseline_log_event(BaselineLogEvent {
2692 flow_id: "unknown",
2693 event_kind: "command_start",
2694 outcome_class: "success",
2695 })
2696 .expect_err("unknown flow must be rejected");
2697 assert!(err.contains("unknown flow_id"));
2698 }
2699
2700 #[test]
2701 fn tail_latency_taxonomy_contract_has_unique_required_keys() {
2702 let contract = tail_latency_taxonomy_contract();
2703 let keys: Vec<&str> = contract
2704 .required_log_fields
2705 .iter()
2706 .map(|field| field.key.as_str())
2707 .collect();
2708 let mut unique_keys = keys.clone();
2709 unique_keys.sort_unstable();
2710 unique_keys.dedup();
2711 assert_eq!(keys.len(), unique_keys.len());
2712 }
2713
2714 #[test]
2715 fn tail_latency_taxonomy_contract_includes_unknown_bucket_and_signals() {
2716 let contract = tail_latency_taxonomy_contract();
2717 assert_eq!(
2718 contract.contract_version,
2719 TAIL_LATENCY_TAXONOMY_CONTRACT_VERSION
2720 );
2721 assert_eq!(contract.unknown_bucket_key, "tail.unknown.unmeasured_ns");
2722 assert!(
2723 contract
2724 .required_log_fields
2725 .iter()
2726 .any(|field| field.key == contract.unknown_bucket_key && field.required)
2727 );
2728 assert!(contract.terms.iter().any(|term| {
2729 term.term_id == "unknown"
2730 && term.direct_duration_key == "tail.unknown.unmeasured_ns"
2731 && term
2732 .signals
2733 .iter()
2734 .any(|signal| signal.structured_log_key == "tail.unknown.unmeasured_ns")
2735 }));
2736 }
2737
2738 #[test]
2739 fn tail_latency_taxonomy_contract_core_signals_have_existing_files() {
2740 let contract = tail_latency_taxonomy_contract();
2741 let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
2742 for signal in contract
2743 .terms
2744 .iter()
2745 .flat_map(|term| term.signals.iter())
2746 .filter(|signal| signal.core)
2747 {
2748 assert!(
2749 repo_root.join(&signal.producer_file).exists(),
2750 "producer file must exist: {}",
2751 signal.producer_file
2752 );
2753 }
2754 }
2755
2756 #[test]
2757 fn diagnostics_debug() {
2758 let state = Arc::new(RuntimeState::new());
2759 let diagnostics = Diagnostics::new(state);
2760 assert!(format!("{diagnostics:?}").contains("Diagnostics"));
2761 }
2762}