use crate::observability::metrics::MetricsProvider;
use crate::types::{ObligationId, Outcome, RegionId, TaskId, outcome::PanicPayload};
use std::backtrace::Backtrace;
use std::collections::BTreeMap;
use std::fmt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::Mutex;
use std::time::Instant;
static PANIC_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone)]
pub struct PanicIsolationConfig {
pub capture_backtraces: bool,
pub enable_panic_logging: bool,
pub panic_threshold_per_region: Option<u32>,
pub isolate_finalizer_panics: bool,
pub isolate_task_panics: bool,
}
impl Default for PanicIsolationConfig {
fn default() -> Self {
Self {
capture_backtraces: cfg!(debug_assertions),
enable_panic_logging: true,
panic_threshold_per_region: Some(10),
isolate_task_panics: true,
isolate_finalizer_panics: true,
}
}
}
#[derive(Debug, Clone)]
pub struct PanicContext {
pub panic_id: u64,
pub location: PanicLocation,
pub timestamp: Instant,
pub panic_message: Option<String>,
pub backtrace: Option<String>,
pub region_id: Option<RegionId>,
pub task_id: Option<TaskId>,
pub obligation_id: Option<ObligationId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PanicLocation {
TaskExecution {
task_id: TaskId,
region_id: RegionId,
poll_attempt: u32,
},
FinalizerExecution {
region_id: RegionId,
finalizer_type: FinalizerType,
},
RegionCleanup {
region_id: RegionId,
cleanup_phase: CleanupPhase,
},
ObligationHandling {
obligation_id: ObligationId,
region_id: RegionId,
},
SchedulerInternal {
worker_id: Option<usize>,
operation: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FinalizerType {
Sync,
Async,
Custom(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CleanupPhase {
Finalizers,
ObligationResolution,
ResourceCleanup,
StateTransition,
}
#[derive(Debug, Clone)]
pub enum PanicIsolationResult<T> {
Success(T),
Panicked(PanicContext),
Skipped {
reason: String,
context: PanicContext,
},
}
impl<T> PanicIsolationResult<T> {
pub fn is_success(&self) -> bool {
matches!(self, PanicIsolationResult::Success(_))
}
pub fn is_panicked(&self) -> bool {
matches!(self, PanicIsolationResult::Panicked(_))
}
pub fn into_success(self) -> Option<T> {
match self {
PanicIsolationResult::Success(value) => Some(value),
_ => None,
}
}
pub fn panic_context(&self) -> Option<&PanicContext> {
match self {
PanicIsolationResult::Panicked(ctx)
| PanicIsolationResult::Skipped { context: ctx, .. } => Some(ctx),
PanicIsolationResult::Success(_) => None,
}
}
}
pub struct PanicIsolator {
config: PanicIsolationConfig,
metrics: Arc<dyn MetricsProvider>,
region_panic_counts: Mutex<BTreeMap<RegionId, u32>>,
}
impl PanicIsolator {
pub fn new(config: PanicIsolationConfig, metrics: Arc<dyn MetricsProvider>) -> Self {
Self {
config,
metrics,
region_panic_counts: Mutex::new(BTreeMap::new()),
}
}
pub fn isolate_task_execution<F, T>(
&self,
task_id: TaskId,
region_id: RegionId,
poll_attempt: u32,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
if !self.config.isolate_task_panics {
return PanicIsolationResult::Success(operation());
}
let location = PanicLocation::TaskExecution {
task_id,
region_id: region_id,
poll_attempt,
};
self.isolate_operation(location, operation)
}
pub fn isolate_finalizer_execution<F, T>(
&self,
region_id: RegionId,
finalizer_type: FinalizerType,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
if !self.config.isolate_finalizer_panics {
return PanicIsolationResult::Success(operation());
}
let location = PanicLocation::FinalizerExecution {
region_id: region_id,
finalizer_type,
};
self.isolate_operation(location, operation)
}
pub fn isolate_region_cleanup<F, T>(
&self,
region_id: RegionId,
phase: CleanupPhase,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
let location = PanicLocation::RegionCleanup {
region_id: region_id,
cleanup_phase: phase,
};
self.isolate_operation(location, operation)
}
pub fn isolate_obligation_handling<F, T>(
&self,
obligation_id: ObligationId,
region_id: RegionId,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
let location = PanicLocation::ObligationHandling {
obligation_id,
region_id: region_id,
};
self.isolate_operation(location, operation)
}
pub fn isolate_scheduler_operation<F, T>(
&self,
worker_id: Option<usize>,
operation_name: String,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
let location = PanicLocation::SchedulerInternal {
worker_id,
operation: operation_name,
};
self.isolate_operation(location, operation)
}
fn isolate_operation<F, T>(
&self,
location: PanicLocation,
operation: F,
) -> PanicIsolationResult<T>
where
F: FnOnce() -> T,
{
if let Some((reason, context)) = self.skip_context_for_threshold(&location) {
if self.config.enable_panic_logging {
self.report_skip(&reason, &context);
}
return PanicIsolationResult::Skipped { reason, context };
}
match std::panic::catch_unwind(AssertUnwindSafe(operation)) {
Ok(result) => PanicIsolationResult::Success(result),
Err(panic_payload) => {
let panic_id = PANIC_COUNTER.fetch_add(1, Ordering::Relaxed);
let context = self.create_panic_context(panic_id, location, &panic_payload);
self.record_region_panic(&context);
if self.config.enable_panic_logging {
self.report_panic(&context);
}
MetricsProviderPanicExt::record_panic(&*self.metrics, &context);
PanicIsolationResult::Panicked(context)
}
}
}
fn skip_context_for_threshold(
&self,
location: &PanicLocation,
) -> Option<(String, PanicContext)> {
let threshold = self.config.panic_threshold_per_region?;
let region_id = self.location_region(location)?;
let panic_count = {
let guard = self.region_panic_counts.lock();
guard.get(®ion_id).copied().unwrap_or(0)
};
if panic_count < threshold {
return None;
}
let reason = format!(
"region {} exceeded panic threshold {} with {} isolated panics",
region_id, threshold, panic_count
);
let panic_id = PANIC_COUNTER.fetch_add(1, Ordering::Relaxed);
let context = self.create_skip_context(panic_id, location.clone(), reason.clone());
Some((reason, context))
}
fn create_skip_context(
&self,
panic_id: u64,
location: PanicLocation,
reason: String,
) -> PanicContext {
let (region_id, task_id, obligation_id) = self.location_ids(&location);
PanicContext {
panic_id,
location,
timestamp: Instant::now(),
panic_message: Some(reason),
backtrace: None,
region_id,
task_id,
obligation_id,
}
}
fn record_region_panic(&self, context: &PanicContext) {
let Some(region_id) = context.region_id else {
return;
};
let mut guard = self.region_panic_counts.lock();
let count = guard.entry(region_id).or_insert(0);
*count = count.saturating_add(1);
}
fn location_region(&self, location: &PanicLocation) -> Option<RegionId> {
self.location_ids(location).0
}
fn location_ids(
&self,
location: &PanicLocation,
) -> (Option<RegionId>, Option<TaskId>, Option<ObligationId>) {
match location {
PanicLocation::TaskExecution {
task_id, region_id, ..
} => (Some(*region_id), Some(*task_id), None),
PanicLocation::FinalizerExecution { region_id, .. } => (Some(*region_id), None, None),
PanicLocation::RegionCleanup { region_id, .. } => (Some(*region_id), None, None),
PanicLocation::ObligationHandling {
obligation_id,
region_id,
} => (Some(*region_id), None, Some(*obligation_id)),
PanicLocation::SchedulerInternal { .. } => (None, None, None),
}
}
fn create_panic_context(
&self,
panic_id: u64,
location: PanicLocation,
panic_payload: &Box<dyn std::any::Any + Send>,
) -> PanicContext {
let panic_message = if let Some(s) = panic_payload.downcast_ref::<&str>() {
Some((*s).to_string())
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
Some(s.clone())
} else {
Some("Non-string panic payload".to_string())
};
let backtrace = if self.config.capture_backtraces {
Some(format!("{}", Backtrace::force_capture()))
} else {
None
};
let (region_id, task_id, obligation_id) = self.location_ids(&location);
PanicContext {
panic_id,
location,
timestamp: Instant::now(),
panic_message,
backtrace,
region_id,
task_id,
obligation_id,
}
}
#[allow(unused_variables)]
fn report_panic(&self, context: &PanicContext) {
crate::tracing_compat::error!(
panic_id = context.panic_id,
location = ?context.location,
panic_message = ?context.panic_message,
region_id = ?context.region_id,
task_id = ?context.task_id,
obligation_id = ?context.obligation_id,
timestamp = ?context.timestamp,
"panic isolated"
);
if let Some(ref backtrace) = context.backtrace {
crate::tracing_compat::error!(
panic_id = context.panic_id,
backtrace = %backtrace,
"panic backtrace captured"
);
}
}
#[allow(unused_variables)]
fn report_skip(&self, reason: &str, context: &PanicContext) {
crate::tracing_compat::warn!(
panic_id = context.panic_id,
reason,
location = ?context.location,
region_id = ?context.region_id,
task_id = ?context.task_id,
obligation_id = ?context.obligation_id,
timestamp = ?context.timestamp,
"panic isolation skipped operation after threshold escalation"
);
}
pub fn panic_to_outcome(&self, context: &PanicContext) -> Outcome<(), crate::error::Error> {
let panic_payload = PanicPayload::new(format!(
"Task panicked in isolation (ID={}): {}",
context.panic_id,
context.panic_message.as_deref().unwrap_or("unknown")
));
Outcome::Panicked(panic_payload)
}
}
impl fmt::Display for PanicLocation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PanicLocation::TaskExecution {
task_id,
region_id,
poll_attempt,
} => {
write!(
f,
"TaskExecution(task={:?}, region={:?}, poll={})",
task_id.0, region_id.0, poll_attempt
)
}
PanicLocation::FinalizerExecution {
region_id,
finalizer_type,
} => {
write!(
f,
"FinalizerExecution(region={:?}, type={:?})",
region_id.0, finalizer_type
)
}
PanicLocation::RegionCleanup {
region_id,
cleanup_phase,
} => {
write!(
f,
"RegionCleanup(region={:?}, phase={:?})",
region_id.0, cleanup_phase
)
}
PanicLocation::ObligationHandling {
obligation_id,
region_id,
} => {
write!(
f,
"ObligationHandling(obligation={:?}, region={:?})",
obligation_id.0, region_id.0
)
}
PanicLocation::SchedulerInternal {
worker_id,
operation,
} => {
if let Some(id) = worker_id {
write!(f, "SchedulerInternal(worker={}, op={})", id, operation)
} else {
write!(f, "SchedulerInternal(op={})", operation)
}
}
}
}
}
pub trait MetricsProviderPanicExt {
fn record_panic(&self, context: &PanicContext);
}
impl<T: ?Sized + MetricsProvider> MetricsProviderPanicExt for T {
fn record_panic(&self, context: &PanicContext) {
let location_tag: &'static str = match &context.location {
PanicLocation::TaskExecution { .. } => "task_execution",
PanicLocation::FinalizerExecution { .. } => "finalizer_execution",
PanicLocation::RegionCleanup { .. } => "region_cleanup",
PanicLocation::ObligationHandling { .. } => "obligation_handling",
PanicLocation::SchedulerInternal { .. } => "scheduler_internal",
};
MetricsProvider::record_panic(self, location_tag);
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::observability::metrics::NoOpMetrics;
use crate::types::{RegionId, TaskId};
use crate::util::ArenaIndex;
use std::sync::Mutex as StdMutex;
#[derive(Default)]
struct CapturingMetrics {
panics: StdMutex<Vec<&'static str>>,
tasks_spawned: StdMutex<Vec<(RegionId, TaskId)>>,
tasks_completed: StdMutex<
Vec<(
TaskId,
crate::observability::metrics::OutcomeKind,
std::time::Duration,
)>,
>,
regions_created: StdMutex<Vec<(RegionId, Option<RegionId>)>>,
regions_closed: StdMutex<Vec<(RegionId, std::time::Duration)>>,
cancellation_requests: StdMutex<Vec<(RegionId, crate::types::CancelKind)>>,
drain_completions: StdMutex<Vec<(RegionId, std::time::Duration)>>,
obligations_created: StdMutex<Vec<RegionId>>,
obligations_discharged: StdMutex<Vec<RegionId>>,
obligations_leaked: StdMutex<Vec<RegionId>>,
}
impl CapturingMetrics {
fn tasks_spawned(&self) -> Vec<(RegionId, TaskId)> {
self.tasks_spawned
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn regions_created(&self) -> Vec<(RegionId, Option<RegionId>)> {
self.regions_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn regions_closed(&self) -> Vec<(RegionId, std::time::Duration)> {
self.regions_closed
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn obligations_created(&self) -> Vec<RegionId> {
self.obligations_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn cancellation_requests(&self) -> Vec<(RegionId, crate::types::CancelKind)> {
self.cancellation_requests
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
#[allow(dead_code)]
fn panics_captured(&self) -> Vec<&'static str> {
self.panics
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
}
impl crate::observability::metrics::MetricsProvider for CapturingMetrics {
fn task_spawned(&self, region_id: RegionId, task_id: TaskId) {
self.tasks_spawned
.lock()
.unwrap()
.push((region_id, task_id));
}
fn task_completed(
&self,
task_id: TaskId,
outcome: crate::observability::metrics::OutcomeKind,
duration: std::time::Duration,
) {
self.tasks_completed
.lock()
.unwrap()
.push((task_id, outcome, duration));
}
fn region_created(&self, region_id: RegionId, parent_id: Option<RegionId>) {
self.regions_created
.lock()
.unwrap()
.push((region_id, parent_id));
}
fn region_closed(&self, region_id: RegionId, duration: std::time::Duration) {
self.regions_closed
.lock()
.unwrap()
.push((region_id, duration));
}
fn cancellation_requested(
&self,
region_id: RegionId,
cancel_kind: crate::types::CancelKind,
) {
self.cancellation_requests
.lock()
.unwrap()
.push((region_id, cancel_kind));
}
fn drain_completed(&self, region_id: RegionId, duration: std::time::Duration) {
self.drain_completions
.lock()
.unwrap()
.push((region_id, duration));
}
fn deadline_set(&self, __region_id: RegionId, __duration: std::time::Duration) {
}
fn deadline_exceeded(&self, __region_id: RegionId) {
}
fn deadline_warning(
&self,
_context: &str,
_location: &'static str,
_remaining: std::time::Duration,
) {
}
fn deadline_violation(&self, _context: &str, _elapsed: std::time::Duration) {
}
fn deadline_remaining(&self, _context: &str, _remaining: std::time::Duration) {
}
fn checkpoint_interval(&self, _context: &str, _interval: std::time::Duration) {
}
fn task_stuck_detected(&self, _task_context: &str) {
}
fn obligation_created(&self, region_id: RegionId) {
self.obligations_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn obligation_discharged(&self, region_id: RegionId) {
self.obligations_discharged
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn obligation_leaked(&self, region_id: RegionId) {
self.obligations_leaked
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn scheduler_tick(&self, _ready_count: usize, _tick_duration: std::time::Duration) {
}
fn record_panic(&self, location: &'static str) {
self.panics
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(location);
}
}
#[test]
fn test_panic_isolation_success() {
let config = PanicIsolationConfig::default();
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let result = isolator.isolate_task_execution(
TaskId::from_arena(ArenaIndex::new(1, 0)),
RegionId::from_arena(ArenaIndex::new(1, 0)),
1,
|| 42,
);
assert!(result.is_success());
assert_eq!(result.into_success(), Some(42));
}
#[test]
fn test_panic_isolation_catches_panic() {
let config = PanicIsolationConfig::default();
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let result = isolator.isolate_task_execution(
TaskId::from_arena(ArenaIndex::new(1, 0)),
RegionId::from_arena(ArenaIndex::new(1, 0)),
1,
|| panic!("test panic"),
);
assert!(result.is_panicked());
if let PanicIsolationResult::Panicked(context) = result {
assert_eq!(context.panic_message, Some("test panic".to_string()));
assert!(matches!(
context.location,
PanicLocation::TaskExecution { .. }
));
}
}
#[test]
fn test_panic_context_creation() {
let config = PanicIsolationConfig {
capture_backtraces: true,
..Default::default()
};
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let result = isolator.isolate_finalizer_execution(
RegionId::from_arena(ArenaIndex::new(2, 0)),
FinalizerType::Sync,
|| panic!("finalizer panic"),
);
if let PanicIsolationResult::Panicked(context) = result {
assert!(context.backtrace.is_some());
assert_eq!(
context.region_id,
Some(RegionId::from_arena(ArenaIndex::new(2, 0)))
);
assert!(matches!(
context.location,
PanicLocation::FinalizerExecution { .. }
));
} else {
panic!("Expected panicked result");
}
}
#[test]
fn test_panic_to_outcome_conversion() {
let config = PanicIsolationConfig::default();
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let context = PanicContext {
panic_id: 1,
location: PanicLocation::TaskExecution {
task_id: TaskId::from_arena(ArenaIndex::new(1, 0)),
region_id: RegionId::from_arena(ArenaIndex::new(1, 0)),
poll_attempt: 1,
},
timestamp: Instant::now(),
panic_message: Some("test panic".to_string()),
backtrace: None,
region_id: Some(RegionId::from_arena(ArenaIndex::new(1, 0))),
task_id: Some(TaskId::from_arena(ArenaIndex::new(1, 0))),
obligation_id: None,
};
let outcome = isolator.panic_to_outcome(&context);
assert!(matches!(outcome, Outcome::Panicked(_)));
}
#[test]
fn test_disabled_isolation() {
let config = PanicIsolationConfig {
isolate_task_panics: false,
..Default::default()
};
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let result = isolator.isolate_task_execution(
TaskId::from_arena(ArenaIndex::new(1, 0)),
RegionId::from_arena(ArenaIndex::new(1, 0)),
1,
|| 42,
);
assert!(result.is_success());
assert_eq!(result.into_success(), Some(42));
}
#[test]
fn test_region_panic_threshold_skips_followup_operations() {
let config = PanicIsolationConfig {
panic_threshold_per_region: Some(1),
capture_backtraces: false,
..Default::default()
};
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
let region_id = RegionId::from_arena(ArenaIndex::new(7, 0));
let first = isolator.isolate_task_execution(task_id, region_id, 1, || panic!("boom"));
assert!(matches!(first, PanicIsolationResult::Panicked(_)));
let second = isolator.isolate_task_execution(task_id, region_id, 2, || 99);
match second {
PanicIsolationResult::Skipped { reason, context } => {
assert!(reason.contains("exceeded panic threshold 1"));
assert_eq!(context.region_id, Some(region_id));
assert_eq!(context.task_id, Some(task_id));
assert_eq!(context.panic_message.as_deref(), Some(reason.as_str()));
}
other => panic!("expected skipped result, got {:?}", other),
}
}
#[test]
fn test_panic_threshold_isolated_per_region() {
let config = PanicIsolationConfig {
panic_threshold_per_region: Some(1),
capture_backtraces: false,
..Default::default()
};
let metrics = Arc::new(NoOpMetrics);
let isolator = PanicIsolator::new(config, metrics);
let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
let region_a = RegionId::from_arena(ArenaIndex::new(8, 0));
let region_b = RegionId::from_arena(ArenaIndex::new(9, 0));
let first = isolator.isolate_task_execution(task_id, region_a, 1, || panic!("boom"));
assert!(matches!(first, PanicIsolationResult::Panicked(_)));
let other_region = isolator.isolate_task_execution(task_id, region_b, 1, || 7);
assert!(matches!(other_region, PanicIsolationResult::Success(7)));
}
#[test]
fn record_panic_routes_to_metrics_provider() {
use std::sync::Mutex as StdMutex;
#[derive(Default)]
struct CapturingMetrics {
panics: StdMutex<Vec<&'static str>>,
tasks_spawned: StdMutex<Vec<(RegionId, TaskId)>>,
tasks_completed: StdMutex<
Vec<(
TaskId,
crate::observability::metrics::OutcomeKind,
std::time::Duration,
)>,
>,
regions_created: StdMutex<Vec<(RegionId, Option<RegionId>)>>,
regions_closed: StdMutex<Vec<(RegionId, std::time::Duration)>>,
cancellation_requests: StdMutex<Vec<(RegionId, crate::types::CancelKind)>>,
drain_completions: StdMutex<Vec<(RegionId, std::time::Duration)>>,
obligations_created: StdMutex<Vec<RegionId>>,
obligations_discharged: StdMutex<Vec<RegionId>>,
obligations_leaked: StdMutex<Vec<RegionId>>,
}
#[allow(dead_code)]
impl CapturingMetrics {
fn tasks_spawned(&self) -> Vec<(RegionId, TaskId)> {
self.tasks_spawned
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn regions_created(&self) -> Vec<(RegionId, Option<RegionId>)> {
self.regions_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn regions_closed(&self) -> Vec<(RegionId, std::time::Duration)> {
self.regions_closed
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn obligations_created(&self) -> Vec<RegionId> {
self.obligations_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn obligations_leaked(&self) -> Vec<RegionId> {
self.obligations_leaked
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn cancellation_requests(&self) -> Vec<(RegionId, crate::types::CancelKind)> {
self.cancellation_requests
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
}
impl crate::observability::metrics::MetricsProvider for CapturingMetrics {
fn task_spawned(&self, region_id: RegionId, task_id: TaskId) {
self.tasks_spawned
.lock()
.unwrap()
.push((region_id, task_id));
}
fn task_completed(
&self,
task_id: TaskId,
outcome_kind: crate::observability::metrics::OutcomeKind,
duration: std::time::Duration,
) {
self.tasks_completed
.lock()
.unwrap()
.push((task_id, outcome_kind, duration));
}
fn region_created(&self, region_id: RegionId, parent_id: Option<RegionId>) {
self.regions_created
.lock()
.unwrap()
.push((region_id, parent_id));
}
fn region_closed(&self, region_id: RegionId, duration: std::time::Duration) {
self.regions_closed
.lock()
.unwrap()
.push((region_id, duration));
}
fn cancellation_requested(
&self,
region_id: RegionId,
cancel_kind: crate::types::CancelKind,
) {
self.cancellation_requests
.lock()
.unwrap()
.push((region_id, cancel_kind));
}
fn drain_completed(&self, region_id: RegionId, duration: std::time::Duration) {
self.drain_completions
.lock()
.unwrap()
.push((region_id, duration));
}
fn deadline_set(&self, _region_id: RegionId, _duration: std::time::Duration) {
}
fn deadline_exceeded(&self, _region_id: RegionId) {
}
fn deadline_warning(
&self,
_context: &str,
_location: &'static str,
_remaining: std::time::Duration,
) {
}
fn deadline_violation(&self, _context: &str, _elapsed: std::time::Duration) {
}
fn deadline_remaining(&self, _context: &str, _remaining: std::time::Duration) {
}
fn checkpoint_interval(&self, _context: &str, _interval: std::time::Duration) {
}
fn task_stuck_detected(&self, _task_context: &str) {
}
fn obligation_created(&self, region_id: RegionId) {
self.obligations_created
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn obligation_discharged(&self, region_id: RegionId) {
self.obligations_discharged
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn obligation_leaked(&self, region_id: RegionId) {
self.obligations_leaked
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(region_id);
}
fn scheduler_tick(&self, _ready_count: usize, _tick_duration: std::time::Duration) {
}
fn record_panic(&self, location: &'static str) {
self.panics
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(location);
}
}
let metrics = CapturingMetrics::default();
let task_id = TaskId::from_arena(ArenaIndex::new(1, 0));
let region_id = RegionId::from_arena(ArenaIndex::new(2, 0));
let cases: Vec<(PanicLocation, &'static str)> = vec![
(
PanicLocation::TaskExecution {
task_id,
region_id,
poll_attempt: 1,
},
"task_execution",
),
(
PanicLocation::FinalizerExecution {
region_id,
finalizer_type: FinalizerType::Sync,
},
"finalizer_execution",
),
(
PanicLocation::RegionCleanup {
region_id,
cleanup_phase: CleanupPhase::Finalizers,
},
"region_cleanup",
),
(
PanicLocation::ObligationHandling {
obligation_id: ObligationId::from_arena(ArenaIndex::new(3, 0)),
region_id,
},
"obligation_handling",
),
(
PanicLocation::SchedulerInternal {
worker_id: Some(0),
operation: "test".to_string(),
},
"scheduler_internal",
),
];
for (location, _expected) in &cases {
let ctx = PanicContext {
panic_id: 0,
location: location.clone(),
timestamp: Instant::now(),
panic_message: Some("test".to_string()),
backtrace: None,
region_id: Some(region_id),
task_id: Some(task_id),
obligation_id: None,
};
<CapturingMetrics as MetricsProviderPanicExt>::record_panic(&metrics, &ctx);
}
let observed: Vec<&'static str> = metrics
.panics
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
let expected: Vec<&'static str> = cases.iter().map(|(_, t)| *t).collect();
assert_eq!(
observed, expected,
"every PanicLocation must route to a record_panic call with the canonical tag"
);
}
#[test]
fn instrumentation_callbacks_capture_runtime_lifecycle_events() {
let metrics = Arc::new(CapturingMetrics::default());
let task_id = TaskId::from_arena(ArenaIndex::new(42, 1));
let region_id = RegionId::from_arena(ArenaIndex::new(100, 2));
let parent_region_id = RegionId::from_arena(ArenaIndex::new(99, 1));
let metrics_ref = &*metrics;
metrics_ref.region_created(region_id, Some(parent_region_id));
metrics_ref.task_spawned(region_id, task_id);
metrics_ref.obligation_created(region_id);
metrics_ref.task_completed(
task_id,
crate::observability::metrics::OutcomeKind::Ok,
std::time::Duration::from_millis(100),
);
metrics_ref.cancellation_requested(region_id, crate::types::CancelKind::User);
metrics_ref.obligation_discharged(region_id);
metrics_ref.region_closed(region_id, std::time::Duration::from_millis(500));
let regions_created = metrics.regions_created();
assert_eq!(regions_created.len(), 1);
assert_eq!(regions_created[0], (region_id, Some(parent_region_id)));
let tasks_spawned = metrics.tasks_spawned();
assert_eq!(tasks_spawned.len(), 1);
assert_eq!(tasks_spawned[0], (region_id, task_id));
let regions_closed = metrics.regions_closed();
assert_eq!(regions_closed.len(), 1);
assert_eq!(regions_closed[0].0, region_id);
assert_eq!(regions_closed[0].1, std::time::Duration::from_millis(500));
let obligations_created = metrics.obligations_created();
assert_eq!(obligations_created.len(), 1);
assert_eq!(obligations_created[0], region_id);
let cancellation_requests = metrics.cancellation_requests();
assert_eq!(cancellation_requests.len(), 1);
assert_eq!(
cancellation_requests[0],
(region_id, crate::types::CancelKind::User)
);
}
}