use atomic::{Atomic, Ordering};
use std::{
cell::UnsafeCell,
mem::size_of,
ops::{Deref, DerefMut},
sync::atomic::AtomicUsize,
};
use crate::heap::*;
use crate::{utils::stack::Stack, system::object::HeapObjectHeader};
#[cfg(target_pointer_width = "64")]
pub type Idx = u32;
#[cfg(target_pointer_width = "32")]
pub type Idx = u16;
#[cfg(target_pointer_width = "64")]
pub const TASKQUEUE_SIZE: usize = 1 << 17;
#[cfg(target_pointer_width = "32")]
pub const TASKQUEUE_SIZE: usize = 1 << 14;
pub union Age {
flags: (Idx, Idx),
data: usize,
}
impl Age {
pub fn new_data(data: usize) -> Self {
Self { data }
}
pub fn new_flags(top: Idx, tag: Idx) -> Self {
Self { flags: (top, tag) }
}
pub fn top(&self) -> Idx {
unsafe { self.flags.0 }
}
pub fn tag(&self) -> Idx {
unsafe { self.flags.1 }
}
}
impl PartialEq for Age {
fn eq(&self, other: &Self) -> bool {
unsafe { self.data == other.data }
}
}
impl Eq for Age {}
impl Clone for Age {
fn clone(&self) -> Self {
*self
}
}
impl Copy for Age {}
pub struct TaskQueueSuper<const N: usize> {
bottom: AtomicUsize,
age: Age,
}
impl<const N: usize> TaskQueueSuper<N> {
pub const fn max_elems() -> usize {
N - 2
}
pub const MOD_N_MASK: usize = N - 1;
pub const fn dirty_size(bot: usize, top: usize) -> usize {
(bot.wrapping_sub(top)) & Self::MOD_N_MASK
}
pub const fn clean_size(bot: usize, top: usize) -> usize {
let sz = Self::dirty_size(bot, top);
if sz == N - 1 {
0
} else {
sz
}
}
pub const fn assert_not_underflow(dirty_size: usize) {
assert!(dirty_size != N - 1, "invariant");
}
pub const fn increment_index(ind: usize) -> usize {
(ind + 1) & Self::MOD_N_MASK
}
pub const fn decrement_index(ind: usize) -> usize {
(ind.wrapping_sub(1)) & Self::MOD_N_MASK
}
pub fn age_top_relaxed(&self) -> Idx {
unsafe { atomic_load(&self.age.flags.0, Ordering::Relaxed) }
}
pub fn cmpxchg_age(&self, old_age: Age, new_age: Age) -> Age {
atomic_cmpxchg(&self.age, old_age, new_age)
}
pub fn set_age_relaxed(&self, new_age: Age) {
atomic_store(&self.age, new_age, Ordering::Relaxed);
}
pub fn release_set_bottom(&self, new_bottom: usize) {
self.bottom.store(new_bottom, Ordering::Release);
}
pub fn set_bottom_relaxed(&self, new_bottom: usize) {
self.bottom.store(new_bottom, Ordering::Relaxed);
}
pub fn bottom_acquire(&self) -> usize {
self.bottom.load(Ordering::Acquire)
}
pub fn bottom_relaxed(&self) -> usize {
self.bottom.load(Ordering::Relaxed)
}
pub const fn new() -> Self {
Self {
bottom: AtomicUsize::new(0),
age: Age { data: 0 },
}
}
pub fn assert_empty(&self) {
assert!(
self.bottom_relaxed() == self.age_top_relaxed() as usize,
"not empty"
)
}
pub fn is_empty(&self) -> bool {
self.size() == 0
}
pub fn size(&self) -> usize {
Self::clean_size(self.bottom_relaxed(), self.age_top_relaxed() as _)
}
pub fn set_empty(&self) {
self.set_bottom_relaxed(0);
self.set_age_relaxed(Age { data: 0 });
}
pub fn age_relaxed(&self) -> Age {
{
atomic_load(&self.age, Ordering::Relaxed)
}
}
}
pub enum PopResult<T> {
Empty,
Contended,
Success(T),
}
pub trait TaskQueue {
type E;
fn push(&mut self, task: Self::E) -> bool;
fn pop_local(&self, threshold: usize) -> Option<Self::E>;
fn pop_global(&self) -> PopResult<Self::E>;
fn size(&self) -> usize;
fn invalidate_last_queue_id(&mut self);
fn last_stolen_queue_id(&self) -> usize;
fn set_last_stolen_queue_id(&mut self, id: usize);
fn next_random_queue_id(&mut self) -> i32;
fn is_empty(&self) -> bool;
}
#[repr(C)]
pub struct GenericTaskQueue<E, const N: usize = TASKQUEUE_SIZE> {
base: TaskQueueSuper<N>,
elems: *mut E,
last_stolen_queue_id: usize,
seed: i32,
}
impl<E, const N: usize> GenericTaskQueue<E, N> {
pub const INVALID_QUEUE_ID: usize = usize::MAX;
pub fn new() -> Self {
unsafe {
let elems = libc::malloc(size_of::<E>() * N) as *mut E;
Self {
base: TaskQueueSuper::new(),
elems,
last_stolen_queue_id: Self::INVALID_QUEUE_ID,
seed: 17,
}
}
}
pub fn set_last_stolen_queue_id(&mut self, id: usize) {
self.last_stolen_queue_id = id;
}
pub fn last_stolen_queue_id(&self) -> usize {
self.last_stolen_queue_id
}
pub fn is_last_stolen_queue_id_valid(&self) -> bool {
self.last_stolen_queue_id != Self::INVALID_QUEUE_ID
}
pub fn invalidate_last_queue_id(&mut self) {
self.last_stolen_queue_id = Self::INVALID_QUEUE_ID;
}
#[cold]
fn pop_local_slow(&self, local_bot: usize, old_age: Age) -> bool {
let new_age = Age::new_flags(local_bot as _, old_age.tag() + 1);
if local_bot == old_age.top() as usize {
let temp_age = self.cmpxchg_age(old_age, new_age);
if temp_age == old_age {
return true;
}
}
self.set_age_relaxed(new_age);
false
}
}
impl<E, const N: usize> TaskQueue for GenericTaskQueue<E, N> {
type E = E;
fn size(&self) -> usize {
(**self).size()
}
fn is_empty(&self) -> bool {
self.base.is_empty()
}
fn push(&mut self, t: E) -> bool {
let local_bot = self.bottom_relaxed();
assert!(local_bot < N, "bottom out of range");
let top = self.age_top_relaxed();
let dirty_n_elems = TaskQueueSuper::<N>::dirty_size(local_bot, top as _);
assert!(
dirty_n_elems <= TaskQueueSuper::<N>::max_elems(),
"n_elems out of range"
);
if dirty_n_elems < TaskQueueSuper::<N>::max_elems() {
unsafe {
self.elems.add(local_bot).write(t);
}
self.release_set_bottom(TaskQueueSuper::<N>::increment_index(local_bot));
true
} else {
false
}
}
fn pop_local(&self, threshold: usize) -> Option<E> {
let mut local_bot = self.bottom_relaxed();
let dirty_n_elems = TaskQueueSuper::<N>::dirty_size(local_bot, self.age_top_relaxed() as _);
if dirty_n_elems <= threshold {
return None;
}
local_bot = TaskQueueSuper::<N>::decrement_index(local_bot);
self.set_bottom_relaxed(local_bot);
atomic::fence(Ordering::Acquire);
unsafe {
let t = self.elems.add(local_bot);
let tp = self.age_top_relaxed();
if TaskQueueSuper::<N>::clean_size(local_bot, tp as _) > 0 {
return Some(t.read());
} else {
let res = self.pop_local_slow(local_bot, self.age_relaxed());
if res {
return Some(t.read());
} else {
return None;
}
}
}
}
fn pop_global(&self) -> PopResult<E> {
let old_age = self.age_relaxed();
atomic::fence(Ordering::Acquire);
let local_bot = self.bottom_acquire();
let n_elems = TaskQueueSuper::<N>::clean_size(local_bot, old_age.top() as _);
if n_elems == 0 {
return PopResult::Empty;
}
unsafe {
let t = self.elems.add(old_age.top() as _);
let new_top = TaskQueueSuper::<N>::increment_index(old_age.top() as _);
let new_tag = old_age.tag() + if new_top == 0 { 1 } else { 0 };
let new_age = Age::new_flags(new_top as _, new_tag);
let res_age = self.cmpxchg_age(old_age, new_age);
if res_age == old_age {
return PopResult::Success(t.read());
} else {
return PopResult::Contended;
}
}
}
fn last_stolen_queue_id(&self) -> usize {
self.last_stolen_queue_id
}
fn invalidate_last_queue_id(&mut self) {
self.last_stolen_queue_id = usize::MAX;
}
fn set_last_stolen_queue_id(&mut self, id: usize) {
self.last_stolen_queue_id = id;
}
fn next_random_queue_id(&mut self) -> i32 {
random_park_and_miller(&mut self.seed)
}
}
impl<E, const N: usize> Drop for GenericTaskQueue<E, N> {
fn drop(&mut self) {
unsafe {
libc::free(self.elems as *mut libc::c_void);
}
}
}
impl<E, const N: usize> Deref for GenericTaskQueue<E, N> {
type Target = TaskQueueSuper<N>;
fn deref(&self) -> &Self::Target {
&self.base
}
}
pub struct OverflowTaskQueue<E: Copy, const N: usize = TASKQUEUE_SIZE> {
base: GenericTaskQueue<E, N>,
overflow: UnsafeCell<Stack<E>>,
}
impl<E: Copy, const N: usize> Deref for OverflowTaskQueue<E, N> {
type Target = GenericTaskQueue<E, N>;
fn deref(&self) -> &Self::Target {
&self.base
}
}
impl<E: Copy, const N: usize> DerefMut for OverflowTaskQueue<E, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.base
}
}
impl<E: Copy, const N: usize> OverflowTaskQueue<E, N> {
pub fn new() -> Self {
Self {
base: GenericTaskQueue::new(),
overflow: UnsafeCell::new(Stack::new(Stack::<E>::DEFAULT_SEGMENT_SIZE, 4, 0)),
}
}
pub fn taskqueue_empty(&self) -> bool {
self.base.is_empty()
}
pub fn overflow_empty(&self) -> bool {
self.overflow_stack().is_empty()
}
pub fn overflow_stack(&self) -> &mut Stack<E> {
unsafe { &mut *self.overflow.get() }
}
pub fn try_push_to_taskqueue(&mut self, elem: E) -> bool {
self.base.push(elem)
}
pub fn pop_overflow(&mut self) -> Option<E> {
self.overflow_stack().pop()
}
}
impl<E: Copy, const N: usize> TaskQueue for OverflowTaskQueue<E, N> {
type E = E;
fn is_empty(&self) -> bool {
self.taskqueue_empty() && self.overflow_empty()
}
fn size(&self) -> usize {
self.base.size()
}
fn push(&mut self, elem: E) -> bool {
if !self.base.push(elem) {
self.overflow_stack().push(elem);
}
true
}
fn pop_global(&self) -> PopResult<E> {
self.base.pop_global()
}
fn pop_local(&self, threshold: usize) -> Option<E> {
self.base.pop_local(threshold)
}
fn invalidate_last_queue_id(&mut self) {
self.base.invalidate_last_queue_id();
}
fn last_stolen_queue_id(&self) -> usize {
self.base.last_stolen_queue_id
}
fn next_random_queue_id(&mut self) -> i32 {
self.base.next_random_queue_id()
}
fn set_last_stolen_queue_id(&mut self, id: usize) {
self.base.set_last_stolen_queue_id(id);
}
}
pub trait TaskQueueSetSuper {
fn tasks(&self) -> usize;
fn size(&self) -> usize;
}
pub trait TaskQueueSetSuperImpl<T: TaskQueue>: TaskQueueSetSuper {
fn steal_best_of_2(&self, queue_num: usize) -> PopResult<T::E>;
fn queue(&self, queue_num: usize) -> *mut T;
fn steal(&self, queue_num: usize) -> Option<T::E>;
fn register_queue(&self, i: usize, queue: *mut T);
}
pub struct GenericTaskQueueSet<T: TaskQueue> {
queues: *mut *mut T,
n: usize,
}
impl<T: TaskQueue> GenericTaskQueueSet<T> {
pub fn new(n: usize) -> Self {
let queues = unsafe { libc::malloc(n * std::mem::size_of::<*mut T>()) as *mut *mut T };
for i in 0..n {
unsafe {
*queues.add(i) = std::ptr::null_mut();
}
}
Self { queues, n }
}
}
impl<T: TaskQueue> Drop for GenericTaskQueueSet<T> {
fn drop(&mut self) {
unsafe {
libc::free(self.queues as *mut libc::c_void);
}
}
}
impl<T: TaskQueue> TaskQueueSetSuper for GenericTaskQueueSet<T> {
fn tasks(&self) -> usize {
let mut n = 0;
for j in 0..self.n {
n += unsafe { (&**(&*self.queues.add(j))).size() }
}
n
}
fn size(&self) -> usize {
self.n
}
}
#[inline]
fn random_park_and_miller(seed0: &mut i32) -> i32 {
let a = 16807;
let m = 2147483647;
let q = 127773;
let r = 2836;
let mut seed = *seed0;
let hi = seed / q;
let lo = seed % q;
let test = a * lo - r * hi;
if test > 0 {
seed = test;
} else {
seed = test + m;
}
*seed0 = seed;
return seed;
}
impl<T: TaskQueue> TaskQueueSetSuperImpl<T> for GenericTaskQueueSet<T> {
fn register_queue(&self, i: usize, queue: *mut T) {
unsafe {
self.queues.add(i).write(queue);
}
}
fn queue(&self, queue_num: usize) -> *mut T {
unsafe { self.queues.add(queue_num).read() }
}
fn steal_best_of_2(&self, queue_num: usize) -> PopResult<T::E> {
unsafe {
let local_queue = self.queue(queue_num);
if self.n > 2 {
let mut k1 = queue_num;
if (*local_queue).last_stolen_queue_id() != usize::MAX {
k1 = (*local_queue).last_stolen_queue_id();
} else {
while k1 == queue_num {
k1 = (*local_queue).next_random_queue_id() as usize % self.n;
}
}
let mut k2 = queue_num;
while k2 == queue_num || k2 == k1 {
k2 = (*local_queue).next_random_queue_id() as usize % self.n;
}
let sz1 = (*self.queue(k1)).size();
let sz2 = (*self.queue(k2)).size();
let sel_k;
let suc;
if sz2 > sz1 {
sel_k = k2;
suc = (*self.queue(k2)).pop_global();
} else {
sel_k = k1;
suc = (*self.queue(k1)).pop_global();
}
if let PopResult::Success(_) = suc {
(*local_queue).set_last_stolen_queue_id(sel_k);
} else {
(*local_queue).invalidate_last_queue_id();
}
return suc;
} else if self.n == 2 {
let k = (queue_num + 1) % 2;
return (*self.queue(k)).pop_global()
} else {
return PopResult::Empty;
}
}
}
fn steal(&self, queue_num: usize) -> Option<T::E> {
let num_retries = 2 * self.n;
for _ in 0..num_retries {
let sr = self.steal_best_of_2(queue_num);
if let PopResult::Success(val) = sr {
return Some(val);
} else if let PopResult::Contended = sr {
continue;
} else {
continue;
}
}
None
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct ObjArrayTask {
obj: *mut HeapObjectHeader,
idx: usize
}
impl ObjArrayTask {
#[inline]
pub const fn obj(&self) -> *mut HeapObjectHeader {
self.obj
}
#[inline]
pub const fn idx(&self) -> usize {
self.idx
}
#[inline]
pub const fn new(obj: *mut HeapObjectHeader, idx: usize) -> Self {
Self { obj, idx }
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct PartialArrayScanTask {
src: *mut HeapObjectHeader
}
impl PartialArrayScanTask {
#[inline]
pub const fn src(&self) -> *mut HeapObjectHeader {
self.src
}
#[inline]
pub const fn new(src: *mut HeapObjectHeader) -> Self {
Self { src }
}
}
pub struct ScannerTask {
p: *mut u8
}
impl ScannerTask {
pub const OOP_TAG: usize = 0;
pub const PARTIAL_ARRAY_TAG: usize = 1;
pub const TAG_SIZE: usize = 1;
pub const TAG_ALIGNMENT: usize = 1 << Self::TAG_SIZE;
pub const TAG_MASK: usize = Self::TAG_ALIGNMENT - 1;
pub fn encode(obj: *mut HeapObjectHeader, tag: usize) -> Self {
Self { p: (obj as usize + tag) as *mut u8 }
}
pub fn raw_value(&self) -> *mut u8 {
self.p
}
pub fn has_tag(&self, tag: usize) -> bool {
(self.p as usize & Self::TAG_MASK) == tag
}
pub fn decode(&self, tag: usize) -> *mut HeapObjectHeader {
(self.p as usize - tag) as *mut HeapObjectHeader
}
pub fn new(obj: *mut HeapObjectHeader) -> Self {
Self::encode(obj, Self::OOP_TAG)
}
pub fn partial_array(obj: PartialArrayScanTask) -> Self {
Self::encode(obj.src(), Self::PARTIAL_ARRAY_TAG)
}
pub fn is_oop_ptr(&self) -> bool {
self.has_tag(Self::OOP_TAG)
}
pub fn is_partial_array_ptr(&self) -> bool {
self.has_tag(Self::PARTIAL_ARRAY_TAG)
}
pub fn to_oop_ptr(&self) -> *mut HeapObjectHeader {
self.decode(Self::OOP_TAG)
}
pub fn to_partial_array_task(&self) -> PartialArrayScanTask {
PartialArrayScanTask::new(self.decode(Self::PARTIAL_ARRAY_TAG))
}
}
pub trait TerminatorTerminator {
fn should_exit_termination(&self) -> bool;
}