use crossbeam_deque::{Steal, Stealer, Worker};
pub struct RunQueue {
worker: Worker<u64>,
}
impl RunQueue {
#[must_use]
pub fn new() -> Self {
Self {
worker: Worker::new_lifo(),
}
}
pub fn push(&self, pid: u64) {
self.worker.push(pid);
}
#[must_use]
pub fn pop(&self) -> Option<u64> {
self.worker.pop()
}
#[must_use]
pub fn len(&self) -> usize {
self.worker.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.worker.is_empty()
}
#[must_use]
pub fn stealer(&self) -> Stealer<u64> {
self.worker.stealer()
}
pub fn steal_half_from(&self, victim: &Stealer<u64>) -> usize {
let victim_len = victim.len();
if victim_len <= 1 {
return 0;
}
let limit = victim_len / 2;
if limit == 0 {
return 0;
}
let before = self.worker.len();
match victim.steal_batch_with_limit_and_pop(&self.worker, limit) {
Steal::Success(pid) => {
self.worker.push(pid);
self.worker.len().saturating_sub(before)
}
Steal::Empty | Steal::Retry => 0,
}
}
}
impl Default for RunQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::RunQueue;
#[test]
fn push_then_pop_returns_same_process() {
let queue = RunQueue::new();
queue.push(42);
assert_eq!(queue.pop(), Some(42));
assert_eq!(queue.pop(), None);
}
#[test]
fn owner_pop_is_lifo() {
let queue = RunQueue::new();
queue.push(1);
queue.push(2);
queue.push(3);
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(1));
}
#[test]
fn steal_half_from_ten_takes_approximately_five() {
let victim = RunQueue::new();
for pid in 0..10 {
victim.push(pid);
}
let stealer = victim.stealer();
let thief = RunQueue::new();
let stolen = thief.steal_half_from(&stealer);
assert!((4..=6).contains(&stolen), "stole {stolen} items");
assert!(!thief.is_empty());
assert!(!victim.is_empty());
}
#[test]
fn steal_from_empty_queue_returns_nothing() {
let victim = RunQueue::new();
let thief = RunQueue::new();
assert_eq!(thief.steal_half_from(&victim.stealer()), 0);
assert!(thief.is_empty());
}
#[test]
fn steal_from_single_item_queue_returns_nothing() {
let victim = RunQueue::new();
victim.push(7);
let thief = RunQueue::new();
assert_eq!(thief.steal_half_from(&victim.stealer()), 0);
assert_eq!(victim.len(), 1);
assert!(thief.is_empty());
}
#[test]
fn push_and_steal_from_different_threads_do_not_race() {
let owner = RunQueue::new();
for pid in 0..100 {
owner.push(pid);
}
let stealer = owner.stealer();
let thief_thread = std::thread::spawn(move || {
let thief = RunQueue::new();
let _stolen = thief.steal_half_from(&stealer);
let mut items = Vec::new();
while let Some(pid) = thief.pop() {
items.push(pid);
}
items
});
let mut owner_items = Vec::new();
while let Some(pid) = owner.pop() {
owner_items.push(pid);
}
let thief_items = match thief_thread.join() {
Ok(items) => items,
Err(payload) => std::panic::resume_unwind(payload),
};
let all: HashSet<_> = owner_items
.iter()
.chain(thief_items.iter())
.copied()
.collect();
assert_eq!(all.len(), owner_items.len() + thief_items.len());
assert!(all.len() <= 100);
}
}