use crate::record::task::TaskRecord;
use crate::types::TaskId;
use crate::util::Arena;
pub const QUEUE_TAG_READY: u8 = 1;
pub const QUEUE_TAG_CANCEL: u8 = 2;
#[derive(Debug)]
pub struct IntrusiveRing {
head: Option<TaskId>,
tail: Option<TaskId>,
len: usize,
tag: u8,
}
impl IntrusiveRing {
#[must_use]
pub const fn new(tag: u8) -> Self {
assert!(tag != 0, "queue tag 0 is reserved for \"not in any queue\"");
Self {
head: None,
tail: None,
len: 0,
tag,
}
}
#[must_use]
#[inline]
pub const fn len(&self) -> usize {
self.len
}
#[must_use]
#[inline]
pub const fn is_empty(&self) -> bool {
self.len == 0
}
#[must_use]
#[inline]
pub const fn tag(&self) -> u8 {
self.tag
}
#[inline]
pub fn push_back(&mut self, task_id: TaskId, arena: &mut Arena<TaskRecord>) {
let Some(record) = arena.get_mut(task_id.arena_index()) else {
return;
};
let in_queue = record.is_in_queue();
debug_assert!(
!in_queue,
"task {:?} already in queue (tag={})",
task_id, record.queue_tag
);
if in_queue {
return;
}
match self.tail {
None => {
record.set_queue_links(None, None, self.tag);
self.head = Some(task_id);
self.tail = Some(task_id);
}
Some(old_tail) => {
record.set_queue_links(Some(old_tail), None, self.tag);
if let Some(old_tail_record) = arena.get_mut(old_tail.arena_index()) {
old_tail_record.next_in_queue = Some(task_id);
}
self.tail = Some(task_id);
}
}
self.len += 1;
}
#[inline]
#[must_use]
pub fn pop_front(&mut self, arena: &mut Arena<TaskRecord>) -> Option<TaskId> {
let head_id = self.head?;
let next = {
let record = arena
.get_mut(head_id.arena_index())
.expect("intrusive list broken: task removed from arena while in queue");
debug_assert!(
record.is_in_queue_tag(self.tag),
"head task {:?} has wrong tag (expected {}, got {})",
head_id,
self.tag,
record.queue_tag
);
let next = record.next_in_queue;
record.clear_queue_links();
next
};
self.head = next;
match next {
None => {
self.tail = None;
}
Some(new_head) => {
if let Some(new_head_record) = arena.get_mut(new_head.arena_index()) {
new_head_record.prev_in_queue = None;
}
}
}
self.len -= 1;
Some(head_id)
}
#[inline]
pub fn remove(&mut self, task_id: TaskId, arena: &mut Arena<TaskRecord>) -> bool {
let Some(record) = arena.get_mut(task_id.arena_index()) else {
return false;
};
if !record.is_in_queue_tag(self.tag) {
return false;
}
let prev = record.prev_in_queue;
let next = record.next_in_queue;
record.clear_queue_links();
match prev {
None => {
self.head = next;
}
Some(prev_id) => {
if let Some(prev_record) = arena.get_mut(prev_id.arena_index()) {
prev_record.next_in_queue = next;
}
}
}
match next {
None => {
self.tail = prev;
}
Some(next_id) => {
if let Some(next_record) = arena.get_mut(next_id.arena_index()) {
next_record.prev_in_queue = prev;
}
}
}
self.len -= 1;
true
}
#[must_use]
pub fn contains(&self, task_id: TaskId, arena: &Arena<TaskRecord>) -> bool {
arena
.get(task_id.arena_index())
.is_some_and(|record| record.is_in_queue_tag(self.tag))
}
#[must_use]
#[inline]
pub const fn peek_front(&self) -> Option<TaskId> {
self.head
}
pub fn clear(&mut self, arena: &mut Arena<TaskRecord>) {
let mut current = self.head;
while let Some(task_id) = current {
if let Some(record) = arena.get_mut(task_id.arena_index()) {
let next = record.next_in_queue;
record.clear_queue_links();
current = next;
} else {
break;
}
}
self.head = None;
self.tail = None;
self.len = 0;
}
}
impl Default for IntrusiveRing {
fn default() -> Self {
Self::new(QUEUE_TAG_READY)
}
}
#[derive(Debug)]
pub struct IntrusiveStack {
top: Option<TaskId>,
bottom: Option<TaskId>,
len: usize,
local_count: usize,
tag: u8,
}
impl IntrusiveStack {
#[must_use]
pub const fn new(tag: u8) -> Self {
assert!(tag != 0, "queue tag 0 is reserved for \"not in any queue\"");
Self {
top: None,
bottom: None,
len: 0,
local_count: 0,
tag,
}
}
#[must_use]
#[inline]
pub const fn len(&self) -> usize {
self.len
}
#[must_use]
#[inline]
pub const fn is_empty(&self) -> bool {
self.len == 0
}
#[must_use]
#[inline]
pub const fn has_local_tasks(&self) -> bool {
self.local_count != 0
}
#[inline]
pub fn push(&mut self, task_id: TaskId, arena: &mut Arena<TaskRecord>) {
let Some(record) = arena.get_mut(task_id.arena_index()) else {
return;
};
if record.is_in_queue() {
return;
}
let is_local = record.is_local();
match self.top {
None => {
record.set_queue_links(None, None, self.tag);
self.top = Some(task_id);
self.bottom = Some(task_id);
}
Some(old_top) => {
record.set_queue_links(None, Some(old_top), self.tag);
if let Some(old_top_record) = arena.get_mut(old_top.arena_index()) {
old_top_record.prev_in_queue = Some(task_id);
}
self.top = Some(task_id);
}
}
self.len += 1;
if is_local {
self.local_count += 1;
}
}
#[inline]
#[allow(dead_code)] pub(crate) fn push_assume_non_local(&mut self, task_id: TaskId, arena: &mut Arena<TaskRecord>) {
let Some(record) = arena.get(task_id.arena_index()) else {
return;
};
if record.is_in_queue() {
return;
}
if record.is_local() {
self.push(task_id, arena);
return;
}
let Some(record) = arena.get_mut(task_id.arena_index()) else {
return;
};
match self.top {
None => {
record.set_queue_links(None, None, self.tag);
self.top = Some(task_id);
self.bottom = Some(task_id);
}
Some(old_top) => {
record.set_queue_links(None, Some(old_top), self.tag);
if let Some(old_top_record) = arena.get_mut(old_top.arena_index()) {
old_top_record.prev_in_queue = Some(task_id);
}
self.top = Some(task_id);
}
}
self.len += 1;
}
#[inline]
pub fn push_bottom(&mut self, task_id: TaskId, arena: &mut Arena<TaskRecord>) {
let Some(record) = arena.get_mut(task_id.arena_index()) else {
return;
};
if record.is_in_queue() {
return;
}
let is_local = record.is_local();
match self.bottom {
None => {
record.set_queue_links(None, None, self.tag);
self.top = Some(task_id);
self.bottom = Some(task_id);
}
Some(old_bottom) => {
record.set_queue_links(Some(old_bottom), None, self.tag);
if let Some(old_bottom_record) = arena.get_mut(old_bottom.arena_index()) {
old_bottom_record.next_in_queue = Some(task_id);
}
self.bottom = Some(task_id);
}
}
self.len += 1;
if is_local {
self.local_count += 1;
}
}
#[inline]
#[must_use]
pub fn pop(&mut self, arena: &mut Arena<TaskRecord>) -> Option<TaskId> {
let top_id = self.top?;
let (next_down, is_local) = {
let record = arena
.get_mut(top_id.arena_index())
.expect("intrusive list broken: task removed from arena while in queue");
let is_local = record.is_local();
let next_down = record.next_in_queue; record.clear_queue_links();
(next_down, is_local)
};
self.top = next_down;
match next_down {
None => {
self.bottom = None;
}
Some(new_top) => {
if let Some(new_top_record) = arena.get_mut(new_top.arena_index()) {
new_top_record.prev_in_queue = None;
}
}
}
self.len -= 1;
if is_local {
debug_assert!(self.local_count > 0);
self.local_count -= 1;
}
Some(top_id)
}
pub fn steal_batch(
&mut self,
max_steal: usize,
arena: &mut Arena<TaskRecord>,
stolen: &mut Vec<TaskId>,
) {
if self.is_empty() {
return;
}
let steal_count = (self.len / 2).max(1).min(max_steal);
for _ in 0..steal_count {
if let Some((bottom_id, _)) = self.steal_one_with_locality(arena) {
stolen.push(bottom_id);
} else {
break;
}
}
}
pub fn steal_batch_into(
&mut self,
max_steal: usize,
arena: &mut Arena<TaskRecord>,
dest: &mut Self,
) -> usize {
if self.is_empty() {
return 0;
}
let steal_count = (self.len / 2).max(1).min(max_steal);
let mut stolen = 0;
for _ in 0..steal_count {
if let Some((task_id, _)) = self.steal_one_with_locality(arena) {
dest.push(task_id, arena);
stolen += 1;
} else {
break;
}
}
stolen
}
#[inline]
#[allow(dead_code)] pub(crate) fn steal_batch_into_non_local(
&mut self,
max_steal: usize,
arena: &mut Arena<TaskRecord>,
dest: &mut Self,
) -> usize {
debug_assert!(
!self.has_local_tasks(),
"steal_batch_into_non_local called on stack with local tasks"
);
if self.is_empty() {
return 0;
}
let steal_count = (self.len / 2).max(1).min(max_steal);
if self.tag == dest.tag {
return self.splice_same_tag_non_local_batch(steal_count, arena, dest);
}
self.rebuild_non_local_batch(steal_count, arena, dest)
}
#[inline]
fn splice_same_tag_non_local_batch(
&mut self,
steal_count: usize,
arena: &mut Arena<TaskRecord>,
dest: &mut Self,
) -> usize {
let Some(segment_bottom) = self.bottom else {
return 0;
};
let mut segment_top = segment_bottom;
let mut new_src_bottom = None;
let mut stolen = 0usize;
let mut current = segment_bottom;
while stolen < steal_count {
let Some(record) = arena.get(current.arena_index()) else {
break;
};
if record.is_local() {
self.local_count = self.local_count.max(1);
break;
}
segment_top = current;
new_src_bottom = record.prev_in_queue;
stolen += 1;
match record.prev_in_queue {
Some(next_up) => current = next_up,
None => break,
}
}
if stolen == 0 {
return 0;
}
if new_src_bottom.is_some_and(|task_id| arena.get(task_id.arena_index()).is_none())
|| dest
.top
.is_some_and(|task_id| arena.get(task_id.arena_index()).is_none())
{
return 0;
}
self.bottom = new_src_bottom;
match new_src_bottom {
None => {
self.top = None;
}
Some(new_bottom) => {
if let Some(new_bottom_record) = arena.get_mut(new_bottom.arena_index()) {
new_bottom_record.next_in_queue = None;
}
}
}
self.len -= stolen;
if let Some(segment_top_record) = arena.get_mut(segment_top.arena_index()) {
segment_top_record.prev_in_queue = None;
}
match dest.top {
None => {
if let Some(segment_bottom_record) = arena.get_mut(segment_bottom.arena_index()) {
segment_bottom_record.next_in_queue = None;
}
dest.top = Some(segment_top);
dest.bottom = Some(segment_bottom);
}
Some(old_top) => {
if let Some(segment_bottom_record) = arena.get_mut(segment_bottom.arena_index()) {
segment_bottom_record.next_in_queue = Some(old_top);
}
if let Some(old_top_record) = arena.get_mut(old_top.arena_index()) {
old_top_record.prev_in_queue = Some(segment_bottom);
}
dest.top = Some(segment_top);
}
}
dest.len += stolen;
stolen
}
#[inline]
fn rebuild_non_local_batch(
&mut self,
steal_count: usize,
arena: &mut Arena<TaskRecord>,
dest: &mut Self,
) -> usize {
let mut stolen = 0;
for _ in 0..steal_count {
let Some(bottom_id) = self.bottom else {
break;
};
let prev_up = {
let Some(record) = arena.get_mut(bottom_id.arena_index()) else {
break;
};
if record.is_local() {
self.local_count = self.local_count.max(1);
break;
}
record.prev_in_queue
};
self.bottom = prev_up;
match prev_up {
None => {
self.top = None;
}
Some(new_bottom) => {
if let Some(new_bottom_record) = arena.get_mut(new_bottom.arena_index()) {
new_bottom_record.next_in_queue = None;
}
}
}
self.len -= 1;
let Some(record) = arena.get_mut(bottom_id.arena_index()) else {
break;
};
match dest.top {
None => {
record.set_queue_links(None, None, dest.tag);
dest.top = Some(bottom_id);
dest.bottom = Some(bottom_id);
}
Some(old_top) => {
record.set_queue_links(None, Some(old_top), dest.tag);
if let Some(old_top_record) = arena.get_mut(old_top.arena_index()) {
old_top_record.prev_in_queue = Some(bottom_id);
}
dest.top = Some(bottom_id);
}
}
dest.len += 1;
stolen += 1;
}
stolen
}
#[inline]
#[must_use]
pub(crate) fn steal_one_with_locality(
&mut self,
arena: &mut Arena<TaskRecord>,
) -> Option<(TaskId, bool)> {
let bottom_id = self.bottom?;
let (prev_up, is_local) = {
let record = arena.get_mut(bottom_id.arena_index())?;
let is_local = record.is_local();
let prev_up = record.prev_in_queue; record.clear_queue_links();
(prev_up, is_local)
};
self.bottom = prev_up;
match prev_up {
None => {
self.top = None;
}
Some(new_bottom) => {
if let Some(new_bottom_record) = arena.get_mut(new_bottom.arena_index()) {
new_bottom_record.next_in_queue = None;
}
}
}
self.len -= 1;
if is_local {
debug_assert!(self.local_count > 0);
self.local_count -= 1;
}
Some((bottom_id, is_local))
}
#[inline]
#[must_use]
#[allow(dead_code)] pub(crate) fn steal_one_assume_non_local(
&mut self,
arena: &mut Arena<TaskRecord>,
) -> Option<TaskId> {
debug_assert!(
!self.has_local_tasks(),
"steal_one_assume_non_local called on stack with local tasks"
);
let bottom_id = self.bottom?;
let prev_up = {
let record = arena.get_mut(bottom_id.arena_index())?;
if record.is_local() {
self.local_count = self.local_count.max(1);
return None;
}
let prev_up = record.prev_in_queue; record.clear_queue_links();
prev_up
};
self.bottom = prev_up;
match prev_up {
None => {
self.top = None;
}
Some(new_bottom) => {
if let Some(new_bottom_record) = arena.get_mut(new_bottom.arena_index()) {
new_bottom_record.next_in_queue = None;
}
}
}
self.len -= 1;
Some(bottom_id)
}
#[inline]
#[must_use]
#[allow(dead_code)] pub(crate) fn steal_one(&mut self, arena: &mut Arena<TaskRecord>) -> Option<TaskId> {
self.steal_one_with_locality(arena)
.map(|(task_id, _)| task_id)
}
#[must_use]
pub fn contains(&self, task_id: TaskId, arena: &Arena<TaskRecord>) -> bool {
arena
.get(task_id.arena_index())
.is_some_and(|record| record.is_in_queue_tag(self.tag))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::task::TaskRecord;
use crate::types::{Budget, RegionId};
use crate::util::ArenaIndex;
fn region() -> RegionId {
RegionId::from_arena(ArenaIndex::new(0, 0))
}
fn task(n: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(n, 0))
}
fn setup_arena(count: u32) -> Arena<TaskRecord> {
let mut arena = Arena::new();
for i in 0..count {
let id = task(i);
let record = TaskRecord::new(id, region(), Budget::INFINITE);
let idx = arena.insert(record);
assert_eq!(idx.index(), i);
}
arena
}
fn pop_all_ring(ring: &mut IntrusiveRing, arena: &mut Arena<TaskRecord>) -> Vec<TaskId> {
let mut popped = Vec::new();
while let Some(task_id) = ring.pop_front(arena) {
popped.push(task_id);
}
popped
}
fn pop_all_stack(stack: &mut IntrusiveStack, arena: &mut Arena<TaskRecord>) -> Vec<TaskId> {
let mut popped = Vec::new();
while let Some(task_id) = stack.pop(arena) {
popped.push(task_id);
}
popped
}
#[test]
fn empty_queue() {
let ring = IntrusiveRing::new(QUEUE_TAG_READY);
assert!(ring.is_empty());
assert_eq!(ring.len(), 0);
assert!(ring.peek_front().is_none());
}
#[test]
fn default_ring_uses_ready_tag() {
let mut arena = setup_arena(1);
let mut ring = IntrusiveRing::default();
assert_eq!(ring.tag(), QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
assert!(ring.is_empty());
}
#[test]
#[should_panic(expected = "queue tag 0 is reserved")]
fn ring_rejects_zero_tag() {
let _ring = IntrusiveRing::new(0);
}
#[test]
#[should_panic(expected = "queue tag 0 is reserved")]
fn stack_rejects_zero_tag() {
let _stack = IntrusiveStack::new(0);
}
#[test]
fn push_pop_single() {
let mut arena = setup_arena(1);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
assert_eq!(ring.len(), 1);
assert!(!ring.is_empty());
assert_eq!(ring.peek_front(), Some(task(0)));
let popped = ring.pop_front(&mut arena);
assert_eq!(popped, Some(task(0)));
assert!(ring.is_empty());
assert_eq!(ring.len(), 0);
let record = arena.get(task(0).arena_index()).unwrap();
assert!(!record.is_in_queue());
}
#[test]
fn fifo_ordering() {
let mut arena = setup_arena(5);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..5 {
ring.push_back(task(i), &mut arena);
}
assert_eq!(ring.len(), 5);
for i in 0..5 {
let popped = ring.pop_front(&mut arena);
assert_eq!(popped, Some(task(i)), "expected task {i}");
}
assert!(ring.is_empty());
}
#[test]
fn remove_from_middle() {
let mut arena = setup_arena(5);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..5 {
ring.push_back(task(i), &mut arena);
}
let removed = ring.remove(task(2), &mut arena);
assert!(removed);
assert_eq!(ring.len(), 4);
let record = arena.get(task(2).arena_index()).unwrap();
assert!(!record.is_in_queue());
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
assert_eq!(ring.pop_front(&mut arena), Some(task(1)));
assert_eq!(ring.pop_front(&mut arena), Some(task(3)));
assert_eq!(ring.pop_front(&mut arena), Some(task(4)));
assert!(ring.is_empty());
}
#[test]
fn remove_head() {
let mut arena = setup_arena(3);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..3 {
ring.push_back(task(i), &mut arena);
}
let removed = ring.remove(task(0), &mut arena);
assert!(removed);
assert_eq!(ring.len(), 2);
assert_eq!(ring.pop_front(&mut arena), Some(task(1)));
assert_eq!(ring.pop_front(&mut arena), Some(task(2)));
}
#[test]
fn remove_tail() {
let mut arena = setup_arena(3);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..3 {
ring.push_back(task(i), &mut arena);
}
let removed = ring.remove(task(2), &mut arena);
assert!(removed);
assert_eq!(ring.len(), 2);
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
assert_eq!(ring.pop_front(&mut arena), Some(task(1)));
}
#[test]
fn remove_only_element() {
let mut arena = setup_arena(1);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
let removed = ring.remove(task(0), &mut arena);
assert!(removed);
assert!(ring.is_empty());
assert!(ring.head.is_none());
assert!(ring.tail.is_none());
}
#[test]
fn remove_not_in_queue() {
let mut arena = setup_arena(2);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
let removed = ring.remove(task(1), &mut arena);
assert!(!removed);
assert_eq!(ring.len(), 1);
}
#[test]
fn contains() {
let mut arena = setup_arena(3);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
ring.push_back(task(1), &mut arena);
assert!(ring.contains(task(0), &arena));
assert!(ring.contains(task(1), &arena));
assert!(!ring.contains(task(2), &arena));
let _ = ring.pop_front(&mut arena);
assert!(!ring.contains(task(0), &arena));
assert!(ring.contains(task(1), &arena));
}
#[test]
fn clear() {
let mut arena = setup_arena(5);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..5 {
ring.push_back(task(i), &mut arena);
}
ring.clear(&mut arena);
assert!(ring.is_empty());
for i in 0..5 {
let record = arena.get(task(i).arena_index()).unwrap();
assert!(!record.is_in_queue());
}
}
#[test]
fn different_queue_tags() {
let mut arena = setup_arena(4);
let mut ready_ring = IntrusiveRing::new(QUEUE_TAG_READY);
let mut cancel_ring = IntrusiveRing::new(QUEUE_TAG_CANCEL);
ready_ring.push_back(task(0), &mut arena);
ready_ring.push_back(task(1), &mut arena);
cancel_ring.push_back(task(2), &mut arena);
cancel_ring.push_back(task(3), &mut arena);
assert!(ready_ring.contains(task(0), &arena));
assert!(ready_ring.contains(task(1), &arena));
assert!(!ready_ring.contains(task(2), &arena));
assert!(!ready_ring.contains(task(3), &arena));
assert!(!cancel_ring.contains(task(0), &arena));
assert!(!cancel_ring.contains(task(1), &arena));
assert!(cancel_ring.contains(task(2), &arena));
assert!(cancel_ring.contains(task(3), &arena));
assert!(!ready_ring.remove(task(2), &mut arena));
assert!(!cancel_ring.remove(task(0), &mut arena));
assert!(ready_ring.remove(task(0), &mut arena));
assert!(cancel_ring.remove(task(2), &mut arena));
}
#[test]
fn interleaved_push_pop() {
let mut arena = setup_arena(10);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
ring.push_back(task(1), &mut arena);
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
ring.push_back(task(2), &mut arena);
assert_eq!(ring.pop_front(&mut arena), Some(task(1)));
assert_eq!(ring.pop_front(&mut arena), Some(task(2)));
ring.push_back(task(3), &mut arena);
ring.push_back(task(4), &mut arena);
ring.push_back(task(5), &mut arena);
assert_eq!(ring.len(), 3);
assert_eq!(ring.pop_front(&mut arena), Some(task(3)));
assert_eq!(ring.pop_front(&mut arena), Some(task(4)));
assert_eq!(ring.pop_front(&mut arena), Some(task(5)));
assert!(ring.is_empty());
}
#[test]
fn high_volume() {
let count = 1000u32;
let mut arena = setup_arena(count);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
for i in 0..count {
ring.push_back(task(i), &mut arena);
}
assert_eq!(ring.len(), count as usize);
for i in 0..count {
let popped = ring.pop_front(&mut arena);
assert_eq!(popped, Some(task(i)));
}
assert!(ring.is_empty());
}
#[test]
fn reuse_after_pop() {
let mut arena = setup_arena(2);
let mut ring = IntrusiveRing::new(QUEUE_TAG_READY);
ring.push_back(task(0), &mut arena);
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
ring.push_back(task(0), &mut arena);
ring.push_back(task(1), &mut arena);
assert_eq!(ring.pop_front(&mut arena), Some(task(0)));
assert_eq!(ring.pop_front(&mut arena), Some(task(1)));
}
#[test]
fn metamorphic_ring_remove_matches_fifo_filter() {
let mut baseline_arena = setup_arena(8);
let mut filtered_arena = setup_arena(8);
let mut baseline = IntrusiveRing::new(QUEUE_TAG_READY);
let mut filtered = IntrusiveRing::new(QUEUE_TAG_READY);
let removed = [task(1), task(4), task(6)];
for i in 0..8 {
baseline.push_back(task(i), &mut baseline_arena);
filtered.push_back(task(i), &mut filtered_arena);
}
for task_id in removed {
assert!(filtered.remove(task_id, &mut filtered_arena));
}
let expected: Vec<_> = pop_all_ring(&mut baseline, &mut baseline_arena)
.into_iter()
.filter(|task_id| !removed.contains(task_id))
.collect();
let actual = pop_all_ring(&mut filtered, &mut filtered_arena);
assert_eq!(actual, expected);
for task_id in removed {
let record = filtered_arena
.get(task_id.arena_index())
.expect("removed task missing");
assert!(!record.is_in_queue());
}
}
#[test]
fn stack_empty() {
let stack = IntrusiveStack::new(QUEUE_TAG_READY);
assert!(stack.is_empty());
assert_eq!(stack.len(), 0);
}
#[test]
fn stack_push_pop_single() {
let mut arena = setup_arena(1);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
stack.push(task(0), &mut arena);
assert_eq!(stack.len(), 1);
assert!(!stack.is_empty());
let popped = stack.pop(&mut arena);
assert_eq!(popped, Some(task(0)));
assert!(stack.is_empty());
}
#[test]
fn stack_tracks_local_task_presence() {
let mut arena = setup_arena(2);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
arena
.get_mut(task(0).arena_index())
.expect("task record missing")
.mark_local();
assert!(!stack.has_local_tasks());
stack.push(task(0), &mut arena);
stack.push(task(1), &mut arena);
assert!(stack.has_local_tasks());
let (stolen_id, is_local) = stack
.steal_one_with_locality(&mut arena)
.expect("oldest task missing");
assert_eq!(stolen_id, task(0));
assert!(is_local);
assert!(!stack.has_local_tasks());
assert_eq!(stack.pop(&mut arena), Some(task(1)));
assert!(!stack.has_local_tasks());
}
#[test]
fn stack_lifo_ordering() {
let mut arena = setup_arena(5);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..5 {
stack.push(task(i), &mut arena);
}
assert_eq!(stack.len(), 5);
for i in (0..5).rev() {
let popped = stack.pop(&mut arena);
assert_eq!(popped, Some(task(i)), "expected task {i}");
}
assert!(stack.is_empty());
}
#[test]
fn stack_push_bottom_restores_owner_visible_order() {
let mut arena = setup_arena(3);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
stack.push(task(0), &mut arena);
stack.push(task(1), &mut arena);
stack.push(task(2), &mut arena);
let oldest = stack.steal_one(&mut arena).expect("oldest task missing");
assert_eq!(oldest, task(0));
stack.push_bottom(oldest, &mut arena);
assert_eq!(stack.pop(&mut arena), Some(task(2)));
assert_eq!(stack.pop(&mut arena), Some(task(1)));
assert_eq!(stack.pop(&mut arena), Some(task(0)));
assert_eq!(stack.pop(&mut arena), None);
}
#[test]
fn stack_steal_fifo() {
let mut arena = setup_arena(8);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..8 {
stack.push(task(i), &mut arena);
}
let mut stolen = Vec::new();
stack.steal_batch(4, &mut arena, &mut stolen);
assert_eq!(stolen.len(), 4);
for (i, task_id) in stolen.into_iter().enumerate() {
assert_eq!(task_id, task(i as u32), "stolen task {i}");
}
assert_eq!(stack.len(), 4);
assert_eq!(stack.pop(&mut arena), Some(task(7)));
assert_eq!(stack.pop(&mut arena), Some(task(6)));
assert_eq!(stack.pop(&mut arena), Some(task(5)));
assert_eq!(stack.pop(&mut arena), Some(task(4)));
}
#[test]
fn stack_steal_batch_into_non_local_preserves_ordering() {
let mut arena = setup_arena(8);
let mut src = IntrusiveStack::new(QUEUE_TAG_READY);
let mut dest = IntrusiveStack::new(QUEUE_TAG_CANCEL);
for i in 0..8 {
src.push(task(i), &mut arena);
}
let stolen = src.steal_batch_into_non_local(4, &mut arena, &mut dest);
assert_eq!(stolen, 4);
assert!(!src.has_local_tasks());
assert!(!dest.has_local_tasks());
assert_eq!(dest.pop(&mut arena), Some(task(3)));
assert_eq!(dest.pop(&mut arena), Some(task(2)));
assert_eq!(dest.pop(&mut arena), Some(task(1)));
assert_eq!(dest.pop(&mut arena), Some(task(0)));
assert_eq!(dest.pop(&mut arena), None);
assert_eq!(src.pop(&mut arena), Some(task(7)));
assert_eq!(src.pop(&mut arena), Some(task(6)));
assert_eq!(src.pop(&mut arena), Some(task(5)));
assert_eq!(src.pop(&mut arena), Some(task(4)));
assert_eq!(src.pop(&mut arena), None);
}
#[test]
fn stack_steal_batch_into_same_tag_splices_above_existing_destination() {
let mut arena = setup_arena(8);
let mut src = IntrusiveStack::new(QUEUE_TAG_READY);
let mut dest = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..6 {
src.push(task(i), &mut arena);
}
dest.push(task(6), &mut arena);
dest.push(task(7), &mut arena);
let stolen = src.steal_batch_into_non_local(3, &mut arena, &mut dest);
assert_eq!(stolen, 3);
assert!(!src.has_local_tasks());
assert!(!dest.has_local_tasks());
assert_eq!(dest.pop(&mut arena), Some(task(2)));
assert_eq!(dest.pop(&mut arena), Some(task(1)));
assert_eq!(dest.pop(&mut arena), Some(task(0)));
assert_eq!(dest.pop(&mut arena), Some(task(7)));
assert_eq!(dest.pop(&mut arena), Some(task(6)));
assert_eq!(dest.pop(&mut arena), None);
assert_eq!(src.pop(&mut arena), Some(task(5)));
assert_eq!(src.pop(&mut arena), Some(task(4)));
assert_eq!(src.pop(&mut arena), Some(task(3)));
assert_eq!(src.pop(&mut arena), None);
}
#[test]
fn stack_work_stealing_semantics() {
let mut arena = setup_arena(10);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..6 {
stack.push(task(i), &mut arena);
}
assert_eq!(stack.pop(&mut arena), Some(task(5)));
let mut stolen = Vec::new();
stack.steal_batch(2, &mut arena, &mut stolen);
assert_eq!(stolen.len(), 2);
assert_eq!(stolen[0], task(0)); assert_eq!(stolen[1], task(1));
stack.push(task(6), &mut arena);
assert_eq!(stack.pop(&mut arena), Some(task(6)));
assert_eq!(stack.pop(&mut arena), Some(task(4)));
assert_eq!(stack.pop(&mut arena), Some(task(3)));
assert_eq!(stack.pop(&mut arena), Some(task(2)));
assert!(stack.is_empty());
}
#[test]
fn metamorphic_batch_steal_matches_repeated_single_steals() {
let mut batch_arena = setup_arena(8);
let mut single_arena = setup_arena(8);
let mut batch_stack = IntrusiveStack::new(QUEUE_TAG_READY);
let mut single_stack = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..8 {
batch_stack.push(task(i), &mut batch_arena);
single_stack.push(task(i), &mut single_arena);
}
let mut batch_stolen = Vec::new();
batch_stack.steal_batch(4, &mut batch_arena, &mut batch_stolen);
let mut single_stolen = Vec::new();
while single_stolen.len() < batch_stolen.len() {
single_stolen.push(
single_stack
.steal_one(&mut single_arena)
.expect("single steal should match batch partition"),
);
}
assert_eq!(single_stolen, batch_stolen);
assert_eq!(
pop_all_stack(&mut single_stack, &mut single_arena),
pop_all_stack(&mut batch_stack, &mut batch_arena)
);
}
#[test]
fn metamorphic_restoring_stolen_suffix_reconstructs_owner_order() {
let mut baseline_arena = setup_arena(6);
let mut restored_arena = setup_arena(6);
let mut baseline = IntrusiveStack::new(QUEUE_TAG_READY);
let mut restored = IntrusiveStack::new(QUEUE_TAG_READY);
for i in 0..6 {
baseline.push(task(i), &mut baseline_arena);
restored.push(task(i), &mut restored_arena);
}
let expected = pop_all_stack(&mut baseline, &mut baseline_arena);
let mut stolen = Vec::new();
restored.steal_batch(3, &mut restored_arena, &mut stolen);
for task_id in stolen.into_iter().rev() {
restored.push_bottom(task_id, &mut restored_arena);
}
assert_eq!(pop_all_stack(&mut restored, &mut restored_arena), expected);
assert!(!restored.has_local_tasks());
}
#[test]
fn stack_steal_from_small() {
let mut arena = setup_arena(2);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
stack.push(task(0), &mut arena);
let mut stolen = Vec::new();
stack.steal_batch(4, &mut arena, &mut stolen);
assert_eq!(stolen.len(), 1);
assert_eq!(stolen[0], task(0));
assert!(stack.is_empty());
}
#[test]
fn stack_steal_from_empty() {
let mut arena = setup_arena(0);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
let mut stolen = Vec::new();
stack.steal_batch(4, &mut arena, &mut stolen);
assert!(stolen.is_empty());
}
#[test]
fn stack_contains() {
let mut arena = setup_arena(3);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
stack.push(task(0), &mut arena);
stack.push(task(1), &mut arena);
assert!(stack.contains(task(0), &arena));
assert!(stack.contains(task(1), &arena));
assert!(!stack.contains(task(2), &arena));
let _ = stack.pop(&mut arena); assert!(stack.contains(task(0), &arena));
assert!(!stack.contains(task(1), &arena));
}
#[test]
fn stack_steal_one_assume_non_local_rejects_local_when_counter_stale() {
let mut arena = setup_arena(1);
let mut stack = IntrusiveStack::new(QUEUE_TAG_READY);
arena
.get_mut(task(0).arena_index())
.expect("task record missing")
.mark_local();
stack.push(task(0), &mut arena);
assert!(stack.has_local_tasks());
stack.local_count = 0;
assert!(!stack.has_local_tasks());
let stolen = stack.steal_one_assume_non_local(&mut arena);
assert!(stolen.is_none());
assert!(stack.has_local_tasks(), "local_count should self-heal");
assert_eq!(stack.len(), 1);
assert_eq!(stack.pop(&mut arena), Some(task(0)));
}
#[test]
fn stack_steal_batch_into_non_local_rejects_local_when_counter_stale() {
let mut arena = setup_arena(3);
let mut src = IntrusiveStack::new(QUEUE_TAG_READY);
let mut dest = IntrusiveStack::new(QUEUE_TAG_CANCEL);
arena
.get_mut(task(0).arena_index())
.expect("task record missing")
.mark_local();
src.push(task(0), &mut arena);
src.push(task(1), &mut arena);
src.push(task(2), &mut arena);
assert!(src.has_local_tasks());
src.local_count = 0;
assert!(!src.has_local_tasks());
let stolen = src.steal_batch_into_non_local(3, &mut arena, &mut dest);
assert_eq!(stolen, 0, "must not steal local task via fast path");
assert!(src.has_local_tasks(), "local_count should self-heal");
assert_eq!(dest.len(), 0, "destination remains untouched");
assert_eq!(src.pop(&mut arena), Some(task(2)));
assert_eq!(src.pop(&mut arena), Some(task(1)));
assert_eq!(src.pop(&mut arena), Some(task(0)));
assert_eq!(src.pop(&mut arena), None);
}
}