use crate::record::task::TaskRecord;
use crate::types::TaskId;
use crate::util::Arena;
#[derive(Debug)]
pub struct IntrusivePriorityHeap {
heap: Vec<TaskId>,
next_generation: u64,
}
impl Default for IntrusivePriorityHeap {
fn default() -> Self {
Self::new()
}
}
impl IntrusivePriorityHeap {
#[must_use]
pub fn new() -> Self {
Self {
heap: Vec::new(),
next_generation: 0,
}
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
heap: Vec::with_capacity(capacity),
next_generation: 0,
}
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.heap.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
#[must_use]
#[inline]
pub fn peek(&self) -> Option<TaskId> {
self.heap.first().copied()
}
#[must_use]
pub fn contains(&self, task: TaskId, arena: &Arena<TaskRecord>) -> bool {
arena.get(task.arena_index()).is_some_and(|record| {
let Some(pos) = record.heap_index else {
return false;
};
let pos = pos as usize;
pos < self.heap.len() && self.heap[pos] == task
})
}
#[inline]
pub fn push(&mut self, task: TaskId, priority: u8, arena: &mut Arena<TaskRecord>) {
let Some(record) = arena.get_mut(task.arena_index()) else {
return;
};
if record.heap_index.is_some() {
return;
}
let generation = self.next_generation;
self.next_generation += 1;
record.sched_priority = priority;
record.sched_generation = generation;
let pos = self.heap.len();
record.heap_index = Some(pos as u32);
self.heap.push(task);
self.sift_up(pos, arena);
}
#[inline]
#[must_use]
pub fn pop(&mut self, arena: &mut Arena<TaskRecord>) -> Option<TaskId> {
if self.heap.is_empty() {
return None;
}
let task = self.heap[0];
self.remove_at(0, arena);
Some(task)
}
pub fn remove(&mut self, task: TaskId, arena: &mut Arena<TaskRecord>) -> bool {
let Some(record) = arena.get(task.arena_index()) else {
return false;
};
let Some(pos) = record.heap_index else {
return false;
};
let pos = pos as usize;
if pos >= self.heap.len() || self.heap[pos] != task {
if let Some(record) = arena.get_mut(task.arena_index()) {
record.heap_index = None;
record.sched_priority = 0;
record.sched_generation = 0;
}
return false;
}
self.remove_at(pos, arena);
true
}
fn remove_at(&mut self, pos: usize, arena: &mut Arena<TaskRecord>) {
let last = self.heap.len() - 1;
if let Some(record) = arena.get_mut(self.heap[pos].arena_index()) {
record.heap_index = None;
record.sched_priority = 0;
record.sched_generation = 0;
}
if pos == last {
self.heap.pop();
return;
}
self.heap.swap(pos, last);
self.heap.pop();
if let Some(record) = arena.get_mut(self.heap[pos].arena_index()) {
record.heap_index = Some(pos as u32);
}
let new_pos = self.sift_up(pos, arena);
if new_pos == pos {
self.sift_down(pos, arena);
}
}
fn sift_up(&mut self, mut pos: usize, arena: &mut Arena<TaskRecord>) -> usize {
while pos > 0 {
let parent = (pos - 1) / 2;
if self.higher_priority(pos, parent, arena) {
self.swap_positions(pos, parent, arena);
pos = parent;
} else {
break;
}
}
pos
}
fn sift_down(&mut self, mut pos: usize, arena: &mut Arena<TaskRecord>) {
let len = self.heap.len();
loop {
let left = 2 * pos + 1;
let right = 2 * pos + 2;
let mut largest = pos;
if left < len && self.higher_priority(left, largest, arena) {
largest = left;
}
if right < len && self.higher_priority(right, largest, arena) {
largest = right;
}
if largest == pos {
break;
}
self.swap_positions(pos, largest, arena);
pos = largest;
}
}
fn higher_priority(&self, a: usize, b: usize, arena: &Arena<TaskRecord>) -> bool {
let task_a = self.heap[a];
let task_b = self.heap[b];
let (prio_a, gen_a) = arena
.get(task_a.arena_index())
.map_or((0, u64::MAX), |r| (r.sched_priority, r.sched_generation));
let (prio_b, gen_b) = arena
.get(task_b.arena_index())
.map_or((0, u64::MAX), |r| (r.sched_priority, r.sched_generation));
match prio_a.cmp(&prio_b) {
std::cmp::Ordering::Greater => true,
std::cmp::Ordering::Less => false,
std::cmp::Ordering::Equal => gen_b.wrapping_sub(gen_a).cast_signed() > 0, }
}
fn swap_positions(&mut self, a: usize, b: usize, arena: &mut Arena<TaskRecord>) {
self.heap.swap(a, b);
if let Some(record) = arena.get_mut(self.heap[a].arena_index()) {
record.heap_index = Some(a as u32);
}
if let Some(record) = arena.get_mut(self.heap[b].arena_index()) {
record.heap_index = Some(b as u32);
}
}
pub fn clear(&mut self, arena: &mut Arena<TaskRecord>) {
for &task in &self.heap {
if let Some(record) = arena.get_mut(task.arena_index()) {
record.heap_index = None;
record.sched_priority = 0;
record.sched_generation = 0;
}
}
self.heap.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Budget, RegionId};
use crate::util::ArenaIndex;
fn region() -> RegionId {
RegionId::from_arena(ArenaIndex::new(0, 0))
}
fn task(n: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(n, 0))
}
fn setup_arena(count: u32) -> Arena<TaskRecord> {
let mut arena = Arena::new();
for i in 0..count {
let id = task(i);
let record = TaskRecord::new(id, region(), Budget::INFINITE);
let idx = arena.insert(record);
assert_eq!(idx.index(), i);
}
arena
}
fn pop_all(heap: &mut IntrusivePriorityHeap, arena: &mut Arena<TaskRecord>) -> Vec<TaskId> {
let mut popped = Vec::new();
while let Some(task) = heap.pop(arena) {
popped.push(task);
}
popped
}
#[test]
fn empty_heap() {
let heap = IntrusivePriorityHeap::new();
assert!(heap.is_empty());
assert_eq!(heap.len(), 0);
assert!(heap.peek().is_none());
}
#[test]
fn push_pop_single() {
let mut arena = setup_arena(1);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 5, &mut arena);
assert_eq!(heap.len(), 1);
assert_eq!(heap.peek(), Some(task(0)));
let popped = heap.pop(&mut arena);
assert_eq!(popped, Some(task(0)));
assert!(heap.is_empty());
let record = arena.get(task(0).arena_index()).unwrap();
assert!(record.heap_index.is_none());
}
#[test]
fn priority_ordering() {
let mut arena = setup_arena(5);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 1, &mut arena); heap.push(task(1), 5, &mut arena); heap.push(task(2), 3, &mut arena); heap.push(task(3), 5, &mut arena); heap.push(task(4), 2, &mut arena);
let first = heap.pop(&mut arena).unwrap();
assert_eq!(first, task(1), "highest priority, earliest generation");
let second = heap.pop(&mut arena).unwrap();
assert_eq!(second, task(3), "same priority as task 1, later generation");
let third = heap.pop(&mut arena).unwrap();
assert_eq!(third, task(2), "priority 3");
let fourth = heap.pop(&mut arena).unwrap();
assert_eq!(fourth, task(4), "priority 2");
let fifth = heap.pop(&mut arena).unwrap();
assert_eq!(fifth, task(0), "priority 1 (lowest)");
assert!(heap.is_empty());
}
#[test]
fn fifo_within_same_priority() {
let mut arena = setup_arena(5);
let mut heap = IntrusivePriorityHeap::new();
for i in 0..5 {
heap.push(task(i), 5, &mut arena);
}
for i in 0..5 {
let popped = heap.pop(&mut arena).unwrap();
assert_eq!(popped, task(i), "FIFO: expected task {i}");
}
}
#[test]
fn remove_by_task_id() {
let mut arena = setup_arena(5);
let mut heap = IntrusivePriorityHeap::new();
for i in 0..5 {
heap.push(task(i), u8::try_from(i).unwrap(), &mut arena);
}
assert_eq!(heap.len(), 5);
let removed = heap.remove(task(2), &mut arena);
assert!(removed);
assert_eq!(heap.len(), 4);
let record = arena.get(task(2).arena_index()).unwrap();
assert!(record.heap_index.is_none());
assert_eq!(heap.pop(&mut arena), Some(task(4)));
assert_eq!(heap.pop(&mut arena), Some(task(3)));
assert_eq!(heap.pop(&mut arena), Some(task(1)));
assert_eq!(heap.pop(&mut arena), Some(task(0)));
}
#[test]
fn remove_not_in_heap() {
let mut arena = setup_arena(2);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 5, &mut arena);
let removed = heap.remove(task(1), &mut arena);
assert!(!removed);
assert_eq!(heap.len(), 1);
}
#[test]
fn contains_check() {
let mut arena = setup_arena(3);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 5, &mut arena);
heap.push(task(1), 3, &mut arena);
assert!(heap.contains(task(0), &arena));
assert!(heap.contains(task(1), &arena));
assert!(!heap.contains(task(2), &arena));
let _ = heap.pop(&mut arena);
assert!(!heap.contains(task(0), &arena)); assert!(heap.contains(task(1), &arena));
}
#[test]
fn no_duplicate_push() {
let mut arena = setup_arena(1);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 5, &mut arena);
heap.push(task(0), 10, &mut arena); assert_eq!(heap.len(), 1);
}
#[test]
fn clear_resets_all() {
let mut arena = setup_arena(5);
let mut heap = IntrusivePriorityHeap::new();
for i in 0..5 {
heap.push(task(i), u8::try_from(i).unwrap(), &mut arena);
}
assert_eq!(heap.len(), 5);
heap.clear(&mut arena);
assert!(heap.is_empty());
for i in 0..5 {
let record = arena.get(task(i).arena_index()).unwrap();
assert!(record.heap_index.is_none());
}
}
#[test]
fn high_volume() {
let count = 1000u32;
let mut arena = setup_arena(count);
let mut heap = IntrusivePriorityHeap::with_capacity(count as usize);
for i in 0..count {
let priority = (i % 10) as u8;
heap.push(task(i), priority, &mut arena);
}
assert_eq!(heap.len(), count as usize);
let mut popped_count = 0u32;
while heap.pop(&mut arena).is_some() {
popped_count += 1;
}
assert_eq!(popped_count, count);
assert!(heap.is_empty());
}
#[test]
fn interleaved_push_pop() {
let mut arena = setup_arena(10);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 3, &mut arena);
heap.push(task(1), 7, &mut arena);
assert_eq!(heap.pop(&mut arena), Some(task(1)));
heap.push(task(2), 5, &mut arena);
heap.push(task(3), 9, &mut arena);
assert_eq!(heap.pop(&mut arena), Some(task(3))); assert_eq!(heap.pop(&mut arena), Some(task(2))); assert_eq!(heap.pop(&mut arena), Some(task(0))); assert!(heap.is_empty());
}
#[test]
fn metamorphic_priority_permutation_preserves_descending_pop_order() {
let fixtures = [(0, 10), (1, 40), (2, 20), (3, 60), (4, 30), (5, 50)];
let canonical_order = [0, 1, 2, 3, 4, 5];
let permuted_order = [2, 5, 1, 4, 0, 3];
let mut canonical_arena = setup_arena(fixtures.len() as u32);
let mut canonical_heap = IntrusivePriorityHeap::new();
for index in canonical_order {
let (task_id, priority) = fixtures[index];
canonical_heap.push(task(task_id), priority, &mut canonical_arena);
}
let mut permuted_arena = setup_arena(fixtures.len() as u32);
let mut permuted_heap = IntrusivePriorityHeap::new();
for index in permuted_order {
let (task_id, priority) = fixtures[index];
permuted_heap.push(task(task_id), priority, &mut permuted_arena);
}
let canonical_popped = pop_all(&mut canonical_heap, &mut canonical_arena);
let permuted_popped = pop_all(&mut permuted_heap, &mut permuted_arena);
let expected = vec![task(3), task(5), task(1), task(4), task(2), task(0)];
assert_eq!(
canonical_popped, expected,
"canonical insertion should pop in descending priority order"
);
assert_eq!(
permuted_popped, expected,
"permuting distinct-priority insertions must preserve pop order"
);
}
#[test]
fn metamorphic_low_priority_noise_preserves_fifo_within_urgent_band() {
let urgent_band = [task(0), task(1), task(2)];
let mut baseline_arena = setup_arena(6);
let mut baseline_heap = IntrusivePriorityHeap::new();
for urgent in urgent_band {
baseline_heap.push(urgent, 9, &mut baseline_arena);
}
let baseline_prefix: Vec<_> = pop_all(&mut baseline_heap, &mut baseline_arena)
.into_iter()
.take(urgent_band.len())
.collect();
let mut noisy_arena = setup_arena(6);
let mut noisy_heap = IntrusivePriorityHeap::new();
noisy_heap.push(task(0), 9, &mut noisy_arena);
noisy_heap.push(task(3), 1, &mut noisy_arena);
noisy_heap.push(task(1), 9, &mut noisy_arena);
noisy_heap.push(task(4), 2, &mut noisy_arena);
noisy_heap.push(task(2), 9, &mut noisy_arena);
noisy_heap.push(task(5), 0, &mut noisy_arena);
let noisy_popped = pop_all(&mut noisy_heap, &mut noisy_arena);
let noisy_prefix: Vec<_> = noisy_popped
.iter()
.copied()
.take(urgent_band.len())
.collect();
assert_eq!(
baseline_prefix, urgent_band,
"equal-priority urgent tasks should pop FIFO without background noise"
);
assert_eq!(
noisy_prefix, urgent_band,
"lower-priority noise must not perturb FIFO ordering within the urgent band"
);
assert!(
noisy_popped
.iter()
.copied()
.skip(urgent_band.len())
.eq([task(4), task(3), task(5)]),
"noise should still drain by descending priority after the urgent band"
);
}
#[test]
fn reuse_after_pop() {
let mut arena = setup_arena(1);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 5, &mut arena);
let _ = heap.pop(&mut arena);
heap.push(task(0), 8, &mut arena);
assert_eq!(heap.len(), 1);
assert_eq!(heap.peek(), Some(task(0)));
let record = arena.get(task(0).arena_index()).unwrap();
assert_eq!(record.sched_priority, 8);
}
#[test]
fn remove_head() {
let mut arena = setup_arena(3);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 1, &mut arena);
heap.push(task(1), 9, &mut arena);
heap.push(task(2), 5, &mut arena);
let removed = heap.remove(task(1), &mut arena);
assert!(removed);
assert_eq!(heap.len(), 2);
assert_eq!(heap.pop(&mut arena), Some(task(2)));
assert_eq!(heap.pop(&mut arena), Some(task(0)));
}
#[test]
fn remove_tail() {
let mut arena = setup_arena(3);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 9, &mut arena);
heap.push(task(1), 5, &mut arena);
heap.push(task(2), 1, &mut arena);
let removed = heap.remove(task(2), &mut arena);
assert!(removed);
assert_eq!(heap.len(), 2);
assert_eq!(heap.pop(&mut arena), Some(task(0)));
assert_eq!(heap.pop(&mut arena), Some(task(1)));
}
#[test]
fn contains_rejects_stale_heap_index() {
let mut arena = setup_arena(2);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 9, &mut arena);
if let Some(record) = arena.get_mut(task(1).arena_index()) {
record.heap_index = Some(0);
record.sched_priority = 9;
record.sched_generation = 0;
}
assert!(heap.contains(task(0), &arena));
assert!(
!heap.contains(task(1), &arena),
"stale index must not be treated as membership"
);
}
#[test]
fn remove_with_stale_heap_index_is_safe_and_non_destructive() {
let mut arena = setup_arena(2);
let mut heap = IntrusivePriorityHeap::new();
heap.push(task(0), 9, &mut arena);
if let Some(record) = arena.get_mut(task(1).arena_index()) {
record.heap_index = Some(0);
record.sched_priority = 9;
record.sched_generation = 0;
}
assert!(
!heap.remove(task(1), &mut arena),
"stale index must not remove arbitrary task"
);
assert_eq!(heap.len(), 1, "heap content must be preserved");
assert_eq!(heap.peek(), Some(task(0)));
let record = arena.get(task(1).arena_index()).unwrap();
assert!(record.heap_index.is_none());
assert_eq!(record.sched_priority, 0);
assert_eq!(record.sched_generation, 0);
}
}