use crate::runtime::scheduler::priority::DispatchLane;
use crate::runtime::scheduler::worker::WorkerId;
use crate::types::TaskId;
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
pub type Priority = u8;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ResourceId(u64);
impl ResourceId {
#[must_use]
pub fn new(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone)]
pub struct PriorityInversion {
pub inversion_id: InversionId,
pub blocked_task: TaskId,
pub blocked_priority: Priority,
pub blocking_task: TaskId,
pub blocking_priority: Priority,
pub resource: ResourceId,
pub start_time: Instant,
pub duration: Option<Duration>,
pub inversion_type: InversionType,
pub task_chain: Vec<TaskId>,
pub impact: InversionImpact,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InversionType {
Direct,
Chain,
Unbounded,
WorkStealing,
}
#[derive(Debug, Clone)]
pub struct InversionImpact {
pub delay_us: u64,
pub affected_tasks: usize,
pub severity: InversionSeverity,
pub throughput_impact: f64,
pub fairness_impact: f64,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum InversionSeverity {
Minor,
Moderate,
Severe,
Critical,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct InversionId(u64);
impl InversionId {
pub fn new(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)] struct TaskState {
task_id: TaskId,
priority: Priority,
lane: DispatchLane,
worker_id: Option<WorkerId>,
start_time: Instant,
blocked_on: Option<ResourceId>,
blocking_tasks: HashSet<TaskId>,
held_resources: HashSet<ResourceId>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] struct ResourceState {
resource_id: ResourceId,
owner: Option<TaskId>,
waiters: VecDeque<TaskId>,
creation_time: Instant,
}
#[derive(Debug, Default, Clone)]
pub struct InversionStats {
pub total_inversions: u64,
pub active_inversions: u64,
pub avg_duration_us: u64,
pub max_duration_us: u64,
pub total_delay_us: u64,
pub by_type: HashMap<String, u64>,
pub by_severity: HashMap<String, u64>,
pub affected_tasks: u64,
pub priority_health: f64,
}
pub struct PriorityInversionOracle {
tasks: Arc<RwLock<HashMap<TaskId, TaskState>>>,
resources: Arc<RwLock<HashMap<ResourceId, ResourceState>>>,
active_inversions: Arc<RwLock<HashMap<InversionId, PriorityInversion>>>,
historical_inversions: Arc<RwLock<VecDeque<PriorityInversion>>>,
stats: Arc<RwLock<InversionStats>>,
next_inversion_id: AtomicU64,
next_resource_id: AtomicU64,
config: InversionOracleConfig,
}
#[derive(Debug, Clone)]
pub struct InversionOracleConfig {
pub min_inversion_duration_us: u64,
pub max_history_size: usize,
pub detect_chain_inversions: bool,
pub priority_threshold: u8,
pub enable_impact_analysis: bool,
}
impl Default for InversionOracleConfig {
fn default() -> Self {
Self {
min_inversion_duration_us: 100, max_history_size: 1000,
detect_chain_inversions: true,
priority_threshold: 1, enable_impact_analysis: true,
}
}
}
impl PriorityInversionOracle {
#[must_use]
pub fn new() -> Self {
Self::with_config(InversionOracleConfig::default())
}
#[must_use]
pub fn with_config(config: InversionOracleConfig) -> Self {
Self {
tasks: Arc::new(RwLock::new(HashMap::new())),
resources: Arc::new(RwLock::new(HashMap::new())),
active_inversions: Arc::new(RwLock::new(HashMap::new())),
historical_inversions: Arc::new(RwLock::new(VecDeque::new())),
stats: Arc::new(RwLock::new(InversionStats::default())),
next_inversion_id: AtomicU64::new(1),
next_resource_id: AtomicU64::new(1),
config,
}
}
pub fn track_task_spawned(
&self,
task_id: TaskId,
priority: Priority,
lane: DispatchLane,
worker_id: Option<WorkerId>,
) {
let task_state = TaskState {
task_id,
priority,
lane,
worker_id,
start_time: Instant::now(),
blocked_on: None,
blocking_tasks: HashSet::new(),
held_resources: HashSet::new(),
};
self.tasks.write().insert(task_id, task_state);
}
pub fn track_task_completed(&self, task_id: TaskId) {
let mut tasks = self.tasks.write();
if let Some(task_state) = tasks.remove(&task_id) {
drop(tasks);
for resource_id in task_state.held_resources {
self.release_resource(task_id, resource_id);
}
self.end_inversions_for_task(task_id);
}
}
pub fn track_resource_acquired(&self, task_id: TaskId, resource_id: ResourceId) {
let mut resources = self.resources.write();
let mut tasks = self.tasks.write();
let resource_state = resources
.entry(resource_id)
.or_insert_with(|| ResourceState {
resource_id,
owner: None,
waiters: VecDeque::new(),
creation_time: Instant::now(),
});
resource_state.owner = Some(task_id);
if let Some(task_state) = tasks.get_mut(&task_id) {
task_state.held_resources.insert(resource_id);
task_state.blocked_on = None;
}
drop(resources);
drop(tasks);
self.check_inversion_resolution(resource_id);
}
pub fn track_resource_waiting(&self, task_id: TaskId, resource_id: ResourceId) {
let mut resources = self.resources.write();
let mut tasks = self.tasks.write();
let resource_state = resources
.entry(resource_id)
.or_insert_with(|| ResourceState {
resource_id,
owner: None,
waiters: VecDeque::new(),
creation_time: Instant::now(),
});
resource_state.waiters.push_back(task_id);
if let Some(task_state) = tasks.get_mut(&task_id) {
task_state.blocked_on = Some(resource_id);
}
let waiting_task_priority = tasks.get(&task_id).map(|t| t.priority);
let owner_task_id = resource_state.owner;
drop(resources);
drop(tasks);
if let (Some(waiting_priority), Some(owner_id)) = (waiting_task_priority, owner_task_id) {
self.detect_direct_inversion(task_id, waiting_priority, owner_id, resource_id);
}
}
pub fn release_resource(&self, task_id: TaskId, resource_id: ResourceId) {
let mut resources = self.resources.write();
let mut tasks = self.tasks.write();
if let Some(resource_state) = resources.get_mut(&resource_id) {
resource_state.owner = None;
if let Some(next_waiter) = resource_state.waiters.pop_front() {
if let Some(waiter_state) = tasks.get_mut(&next_waiter) {
waiter_state.blocked_on = None;
}
resource_state.owner = Some(next_waiter);
if let Some(waiter_state) = tasks.get_mut(&next_waiter) {
waiter_state.held_resources.insert(resource_id);
}
}
}
if let Some(task_state) = tasks.get_mut(&task_id) {
task_state.held_resources.remove(&resource_id);
}
drop(resources);
drop(tasks);
self.check_inversion_resolution(resource_id);
}
fn detect_direct_inversion(
&self,
blocked_task: TaskId,
blocked_priority: Priority,
blocking_task: TaskId,
resource: ResourceId,
) {
let tasks = self.tasks.read();
if let Some(blocking_task_state) = tasks.get(&blocking_task) {
let blocking_priority = blocking_task_state.priority;
if blocked_priority > blocking_priority
&& (blocked_priority - blocking_priority) >= self.config.priority_threshold
{
drop(tasks);
let inversion_id =
InversionId::new(self.next_inversion_id.fetch_add(1, Ordering::Relaxed));
let start_time = Instant::now();
let impact = if self.config.enable_impact_analysis {
self.analyze_inversion_impact(
blocked_task,
blocking_task,
blocked_priority,
blocking_priority,
)
} else {
InversionImpact {
delay_us: 0,
affected_tasks: 0,
severity: InversionSeverity::Minor,
throughput_impact: 0.0,
fairness_impact: 0.0,
}
};
let inversion = PriorityInversion {
inversion_id,
blocked_task,
blocked_priority,
blocking_task,
blocking_priority,
resource,
start_time,
duration: None,
inversion_type: InversionType::Direct,
task_chain: vec![blocked_task, blocking_task],
impact,
};
self.active_inversions
.write()
.insert(inversion_id, inversion.clone());
self.update_stats_for_new_inversion(&inversion);
if self.config.detect_chain_inversions {
self.detect_chain_inversions(blocked_task, blocked_priority);
}
}
}
}
fn detect_chain_inversions(&self, original_blocked: TaskId, original_priority: Priority) {
let tasks = self.tasks.read();
let resources = self.resources.read();
let mut visited = HashSet::new();
let mut chain = Vec::new();
self.find_blocking_chains(
&tasks,
&resources,
original_blocked,
original_priority,
&mut visited,
&mut chain,
);
}
fn find_blocking_chains(
&self,
tasks: &HashMap<TaskId, TaskState>,
resources: &HashMap<ResourceId, ResourceState>,
current_task: TaskId,
original_priority: Priority,
visited: &mut HashSet<TaskId>,
chain: &mut Vec<TaskId>,
) {
if visited.contains(¤t_task) {
return; }
visited.insert(current_task);
chain.push(current_task);
if let Some(task_state) = tasks.get(¤t_task) {
if let Some(resource_id) = task_state.blocked_on {
if let Some(resource_state) = resources.get(&resource_id) {
if let Some(owner_task) = resource_state.owner {
if let Some(owner_state) = tasks.get(&owner_task) {
if original_priority > owner_state.priority
&& (original_priority - owner_state.priority)
>= self.config.priority_threshold
&& chain.len() > 2
{
let inversion_id = InversionId::new(
self.next_inversion_id.fetch_add(1, Ordering::Relaxed),
);
let impact = if self.config.enable_impact_analysis {
self.analyze_inversion_impact(
chain[0],
owner_task,
original_priority,
owner_state.priority,
)
} else {
InversionImpact {
delay_us: 0,
affected_tasks: chain.len(),
severity: InversionSeverity::Moderate,
throughput_impact: 0.0,
fairness_impact: 0.0,
}
};
let inversion = PriorityInversion {
inversion_id,
blocked_task: chain[0],
blocked_priority: original_priority,
blocking_task: owner_task,
blocking_priority: owner_state.priority,
resource: resource_id,
start_time: Instant::now(),
duration: None,
inversion_type: InversionType::Chain,
task_chain: chain.clone(),
impact,
};
let mut active = self.active_inversions.write();
active.insert(inversion_id, inversion.clone());
drop(active);
self.update_stats_for_new_inversion(&inversion);
}
self.find_blocking_chains(
tasks,
resources,
owner_task,
original_priority,
visited,
chain,
);
}
}
}
}
}
chain.pop();
}
fn check_inversion_resolution(&self, resource_id: ResourceId) {
let mut resolved_inversions = Vec::new();
{
let active = self.active_inversions.read();
for (inversion_id, inversion) in active.iter() {
if inversion.resource == resource_id && inversion.duration.is_none() {
resolved_inversions.push(*inversion_id);
}
}
}
for inversion_id in resolved_inversions {
self.end_inversion(inversion_id);
}
}
fn end_inversions_for_task(&self, task_id: TaskId) {
let mut to_resolve = Vec::new();
{
let active = self.active_inversions.read();
for (inversion_id, inversion) in active.iter() {
if (inversion.blocked_task == task_id || inversion.blocking_task == task_id)
&& inversion.duration.is_none()
{
to_resolve.push(*inversion_id);
}
}
}
for inversion_id in to_resolve {
self.end_inversion(inversion_id);
}
}
fn end_inversion(&self, inversion_id: InversionId) {
let mut active = self.active_inversions.write();
if let Some(mut inversion) = active.remove(&inversion_id) {
let end_time = Instant::now();
let duration = end_time.duration_since(inversion.start_time);
if duration.as_micros() as u64 >= self.config.min_inversion_duration_us {
inversion.duration = Some(duration);
if self.config.enable_impact_analysis {
inversion.impact.delay_us = duration.as_micros() as u64;
inversion.impact.severity =
self.classify_inversion_severity(duration, &inversion);
}
let mut history = self.historical_inversions.write();
history.push_back(inversion.clone());
while history.len() > self.config.max_history_size {
history.pop_front();
}
drop(history);
self.update_stats_for_resolved_inversion(&inversion);
}
}
}
fn analyze_inversion_impact(
&self,
_blocked_task: TaskId,
_blocking_task: TaskId,
blocked_priority: Priority,
blocking_priority: Priority,
) -> InversionImpact {
let tasks = self.tasks.read();
let affected_tasks = tasks
.values()
.filter(|t| t.priority >= blocked_priority)
.count();
let priority_diff = blocked_priority - blocking_priority;
let severity = match priority_diff {
1..=2 => InversionSeverity::Minor,
3..=5 => InversionSeverity::Moderate,
6..=10 => InversionSeverity::Severe,
_ => InversionSeverity::Critical,
};
let throughput_impact = f64::from(priority_diff) * 0.1; let fairness_impact = f64::from(priority_diff) * 0.05;
InversionImpact {
delay_us: 0, affected_tasks,
severity,
throughput_impact: throughput_impact.min(1.0),
fairness_impact: fairness_impact.min(1.0),
}
}
fn classify_inversion_severity(
&self,
duration: Duration,
inversion: &PriorityInversion,
) -> InversionSeverity {
let duration_ms = duration.as_millis() as u64;
let priority_diff = inversion.blocked_priority - inversion.blocking_priority;
match (duration_ms, priority_diff) {
(0..=1, _) => InversionSeverity::Minor,
(2..=10, 1..=2) => InversionSeverity::Minor,
(2..=10, 3..=5) => InversionSeverity::Moderate,
(2..=10, _) => InversionSeverity::Severe,
(11..=100, 1..=2) => InversionSeverity::Moderate,
(11..=100, _) => InversionSeverity::Severe,
(_, _) => InversionSeverity::Critical,
}
}
fn update_stats_for_new_inversion(&self, inversion: &PriorityInversion) {
let mut stats = self.stats.write();
stats.total_inversions += 1;
stats.active_inversions += 1;
let type_name = format!("{:?}", inversion.inversion_type);
*stats.by_type.entry(type_name).or_insert(0) += 1;
let severity_name = format!("{:?}", inversion.impact.severity);
*stats.by_severity.entry(severity_name).or_insert(0) += 1;
stats.affected_tasks += inversion.impact.affected_tasks as u64;
self.update_priority_health(&mut stats);
}
fn update_stats_for_resolved_inversion(&self, inversion: &PriorityInversion) {
let mut stats = self.stats.write();
stats.active_inversions = stats.active_inversions.saturating_sub(1);
if let Some(duration) = inversion.duration {
let duration_us = duration.as_micros() as u64;
stats.total_delay_us += duration_us;
stats.max_duration_us = stats.max_duration_us.max(duration_us);
if stats.total_inversions > 0 {
stats.avg_duration_us = stats.total_delay_us / stats.total_inversions;
}
}
self.update_priority_health(&mut stats);
}
fn update_priority_health(&self, stats: &mut InversionStats) {
let active_penalty = (stats.active_inversions as f64) * 0.1;
let delay_penalty = (stats.total_delay_us as f64) / 1_000_000.0;
stats.priority_health = (1.0 - (active_penalty + delay_penalty * 0.01)).max(0.0);
}
pub fn create_resource(&self) -> ResourceId {
ResourceId::new(self.next_resource_id.fetch_add(1, Ordering::Relaxed))
}
pub fn get_stats(&self) -> InversionStats {
self.stats.read().clone()
}
pub fn get_active_inversions(&self) -> Vec<PriorityInversion> {
self.active_inversions.read().values().cloned().collect()
}
pub fn get_historical_inversions(&self) -> Vec<PriorityInversion> {
self.historical_inversions.read().iter().cloned().collect()
}
pub fn reset(&self) {
self.tasks.write().clear();
self.resources.write().clear();
self.active_inversions.write().clear();
self.historical_inversions.write().clear();
*self.stats.write() = InversionStats::default();
self.next_inversion_id.store(1, Ordering::Relaxed);
self.next_resource_id.store(1, Ordering::Relaxed);
}
}
impl Default for PriorityInversionOracle {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::TaskId;
#[test]
fn test_oracle_creation() {
let oracle = PriorityInversionOracle::new();
let stats = oracle.get_stats();
assert_eq!(stats.total_inversions, 0);
assert_eq!(stats.active_inversions, 0);
}
#[test]
fn test_task_tracking() {
let oracle = PriorityInversionOracle::new();
let task_id = TaskId::new_for_test(1, 0);
oracle.track_task_spawned(task_id, 5, DispatchLane::Ready, None);
let tasks = oracle.tasks.read();
assert!(tasks.contains_key(&task_id));
assert_eq!(tasks.get(&task_id).unwrap().priority, 5);
}
#[test]
fn test_direct_inversion_detection() {
let oracle = PriorityInversionOracle::new();
let high_priority_task = TaskId::new_for_test(1, 0);
let low_priority_task = TaskId::new_for_test(2, 0);
let resource = oracle.create_resource();
oracle.track_task_spawned(high_priority_task, 10, DispatchLane::Ready, None);
oracle.track_task_spawned(low_priority_task, 1, DispatchLane::Ready, None);
oracle.track_resource_acquired(low_priority_task, resource);
oracle.track_resource_waiting(high_priority_task, resource);
let active_inversions = oracle.get_active_inversions();
assert_eq!(active_inversions.len(), 1);
let inversion = &active_inversions[0];
assert_eq!(inversion.blocked_task, high_priority_task);
assert_eq!(inversion.blocking_task, low_priority_task);
assert_eq!(inversion.blocked_priority, 10);
assert_eq!(inversion.blocking_priority, 1);
assert_eq!(inversion.inversion_type, InversionType::Direct);
}
#[test]
fn test_inversion_resolution() {
let oracle = PriorityInversionOracle::new();
let high_priority_task = TaskId::new_for_test(1, 0);
let low_priority_task = TaskId::new_for_test(2, 0);
let resource = oracle.create_resource();
oracle.track_task_spawned(high_priority_task, 10, DispatchLane::Ready, None);
oracle.track_task_spawned(low_priority_task, 1, DispatchLane::Ready, None);
oracle.track_resource_acquired(low_priority_task, resource);
oracle.track_resource_waiting(high_priority_task, resource);
assert_eq!(oracle.get_active_inversions().len(), 1);
oracle.release_resource(low_priority_task, resource);
assert_eq!(oracle.get_active_inversions().len(), 0);
std::thread::sleep(std::time::Duration::from_millis(1)); assert!(oracle.get_historical_inversions().len() <= 1); }
#[test]
fn test_no_inversion_same_priority() {
let oracle = PriorityInversionOracle::new();
let task1 = TaskId::new_for_test(1, 0);
let task2 = TaskId::new_for_test(2, 0);
let resource = oracle.create_resource();
oracle.track_task_spawned(task1, 5, DispatchLane::Ready, None);
oracle.track_task_spawned(task2, 5, DispatchLane::Ready, None);
oracle.track_resource_acquired(task1, resource);
oracle.track_resource_waiting(task2, resource);
assert_eq!(oracle.get_active_inversions().len(), 0);
}
#[test]
fn test_statistics_updates() {
let oracle = PriorityInversionOracle::new();
let high_task = TaskId::new_for_test(1, 0);
let low_task = TaskId::new_for_test(2, 0);
let resource = oracle.create_resource();
oracle.track_task_spawned(high_task, 10, DispatchLane::Ready, None);
oracle.track_task_spawned(low_task, 1, DispatchLane::Ready, None);
oracle.track_resource_acquired(low_task, resource);
oracle.track_resource_waiting(high_task, resource);
let stats = oracle.get_stats();
assert_eq!(stats.total_inversions, 1);
assert_eq!(stats.active_inversions, 1);
assert!(stats.by_type.contains_key("Direct"));
}
#[test]
fn test_resource_id_uniqueness() {
let oracle = PriorityInversionOracle::new();
let resource1 = oracle.create_resource();
let resource2 = oracle.create_resource();
assert_ne!(resource1, resource2);
}
#[test]
fn test_task_completion_cleanup() {
let oracle = PriorityInversionOracle::new();
let task_id = TaskId::new_for_test(1, 0);
oracle.track_task_spawned(task_id, 5, DispatchLane::Ready, None);
assert!(oracle.tasks.read().contains_key(&task_id));
oracle.track_task_completed(task_id);
assert!(!oracle.tasks.read().contains_key(&task_id));
}
}