use std::marker::PhantomData;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam::utils::CachePadded;
use crate::arch::ArchConfig;
use crate::slab::HugeSlab;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SpinPolicy {
Busy = 0,
HybridYield = 1,
Sleep = 2,
}
pub struct SlayerStats {
pub total_reads: u64,
pub wins: u64,
pub misses: u64,
}
const STATE_IDLE: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DONE: u8 = 2;
pub const MAX_T_SIZE: usize = 64;
const RESULT_SLOT_BACK_OFFSET: usize = MAX_T_SIZE * 2;
struct SeqLock {
seq: AtomicUsize,
}
impl SeqLock {
fn new() -> Self {
Self { seq: AtomicUsize::new(0) }
}
#[inline(always)]
fn write_begin(&self) {
self.seq.fetch_add(1, Ordering::Release);
}
#[inline(always)]
fn write_end(&self) {
self.seq.fetch_add(1, Ordering::Release);
}
#[inline(always)]
fn read_begin(&self) -> usize {
self.seq.load(Ordering::Acquire)
}
#[inline(always)]
fn read_end(&self, start: usize) -> bool {
(start & 1 == 0) && (self.seq.load(Ordering::Acquire) == start)
}
}
pub struct UltraSlayer<T> {
slab: Arc<HugeSlab>,
request_idx: Arc<CachePadded<AtomicUsize>>,
state: Arc<CachePadded<AtomicU8>>,
spin_policy: Arc<AtomicU8>, result_slot_offset: usize,
num_replicas: usize,
config: ArchConfig,
elem_size: usize,
seqlock: Arc<SeqLock>,
#[allow(dead_code)]
capacity: usize,
_marker: PhantomData<T>,
total_reads: Arc<AtomicUsize>,
wins: Arc<AtomicUsize>,
misses: Arc<AtomicUsize>,
}
impl<T: Copy + Send + Sync + 'static> UltraSlayer<T> {
pub fn new(num_replicas: usize, capacity: usize) -> Self {
assert!(num_replicas >= 2, "num_replicas must be >= 2 for hedging");
let elem_size = std::mem::size_of::<T>();
assert!(elem_size <= MAX_T_SIZE, "T is {elem_size} bytes; MAX_T_SIZE is {MAX_T_SIZE}");
let config = ArchConfig::for_platform();
assert!(
config.replica_offset >= capacity * elem_size,
"replica_offset ({}) < capacity * elem_size ({}).",
config.replica_offset, capacity * elem_size
);
let data_size = config.replica_offset * num_replicas;
let slab_size = data_size.checked_add(RESULT_SLOT_BACK_OFFSET).expect("slab size overflow");
let slab = Arc::new(HugeSlab::new(slab_size));
let result_slot_offset = data_size;
Self {
slab,
request_idx: Arc::new(CachePadded::new(AtomicUsize::new(0))),
state: Arc::new(CachePadded::new(AtomicU8::new(STATE_IDLE))),
spin_policy: Arc::new(AtomicU8::new(SpinPolicy::Busy as u8)),
result_slot_offset,
num_replicas,
config,
elem_size,
seqlock: Arc::new(SeqLock::new()),
capacity,
_marker: PhantomData,
total_reads: Arc::new(AtomicUsize::new(0)),
wins: Arc::new(AtomicUsize::new(0)),
misses: Arc::new(AtomicUsize::new(0)),
}
}
#[cfg(feature = "slice")]
pub unsafe fn slice(&self) -> crate::slice::Slice<'_, T> {
crate::slice::Slice::from_raw_parts(self.slab.ptr() as *mut T, self.capacity)
}
pub fn set_spin_policy(&self, policy: SpinPolicy) {
self.spin_policy.store(policy as u8, Ordering::SeqCst);
}
pub fn stats(&self) -> SlayerStats {
SlayerStats {
total_reads: self.total_reads.load(Ordering::Relaxed) as u64,
wins: self.wins.load(Ordering::Relaxed) as u64,
misses: self.misses.load(Ordering::Relaxed) as u64,
}
}
pub fn pin_to_core(&self, core_id: usize) {
let core_ids = core_affinity::get_core_ids().expect("core_affinity failed");
let target = core_ids.into_iter().find(|c| c.id == core_id)
.unwrap_or_else(|| panic!("cpu_id {core_id} not found"));
core_affinity::set_for_current(target);
}
pub fn insert(&self, index: usize, value: T) {
let size = self.elem_size;
let slab_size = self.slab.size();
self.seqlock.write_begin();
for r in 0..self.num_replicas {
let offset = self.config.replica_offset.checked_mul(r)
.and_then(|o| o.checked_add(index.checked_mul(size).expect("index overflow")))
.expect("replica offset overflow");
assert!(offset.checked_add(size).map_or(false, |end| end <= slab_size), "insert out of bounds");
unsafe {
let dst = self.slab.ptr().add(offset);
std::ptr::copy_nonoverlapping(&value as *const T as *const u8, dst, size);
}
}
std::sync::atomic::fence(Ordering::Release);
self.seqlock.write_end();
}
pub fn spawn_slayer_core(&self, cpu_id: usize) {
let request_idx = Arc::clone(&self.request_idx);
let state = Arc::clone(&self.state);
let spin_policy = Arc::clone(&self.spin_policy);
let slab = Arc::clone(&self.slab);
let replica_offset = self.config.replica_offset;
let result_slot_offset = self.result_slot_offset;
let size = self.elem_size;
let num_replicas = self.num_replicas;
let seqlock = Arc::clone(&self.seqlock);
let total_reads = Arc::clone(&self.total_reads);
let wins = Arc::clone(&self.wins);
let misses = Arc::clone(&self.misses);
std::thread::Builder::new()
.name(format!("slayer-core-{cpu_id}"))
.spawn(move || {
let core_ids = core_affinity::get_core_ids().expect("core_affinity failed");
let target = core_ids.into_iter().find(|c| c.id == cpu_id)
.unwrap_or_else(|| panic!("cpu_id {cpu_id} not found"));
core_affinity::set_for_current(target);
let slab_ptr = slab.ptr();
let slab_size = slab.size();
unsafe {
for i in (0..slab_size).step_by(4096) {
std::ptr::write_volatile(slab_ptr.add(i), 0);
}
}
loop {
while state.load(Ordering::SeqCst) != STATE_READY {
let policy = spin_policy.load(Ordering::Relaxed);
match policy {
0 => { std::hint::spin_loop();
#[cfg(target_arch = "x86_64")]
{ std::arch::x86_64::_mm_pause(); }
#[cfg(target_arch = "aarch64")]
{ std::arch::asm!("yield"); }
},
1 => { for _ in 0..100 { std::hint::spin_loop(); }
std::thread::yield_now();
},
_ => { std::thread::sleep(std::time::Duration::from_micros(10));
}
}
}
let idx = request_idx.load(Ordering::SeqCst);
total_reads.fetch_add(1, Ordering::Relaxed);
let max_offset = replica_offset
.saturating_mul(num_replicas - 1)
.saturating_add(idx.saturating_mul(size))
.saturating_add(size);
if max_offset > result_slot_offset {
state.store(STATE_DONE, Ordering::SeqCst);
misses.fetch_add(1, Ordering::Relaxed);
continue;
}
unsafe {
let base = slab.ptr();
let words = size.div_ceil(8).min(8);
loop {
let seq_start = seqlock.read_begin();
for r in 0..num_replicas.min(2) {
let addr = base.add(replica_offset * r + idx * size);
#[cfg(target_arch = "x86_64")]
std::arch::x86_64::_mm_prefetch(addr as *const i8, std::arch::x86_64::_MM_HINT_T0);
#[cfg(target_arch = "aarch64")]
std::arch::asm!("prfm pldl1, [{0}]", in(reg) addr);
}
let mut replica_bufs: [[u64; 8]; 2] = [[0u64; 8]; 2];
for r in 0..num_replicas.min(2) {
let addr = base.add(replica_offset * r + idx * size) as *const u64;
for w in 0..words {
replica_bufs[r][w] = std::ptr::read_volatile(addr.add(w));
}
}
if seqlock.read_end(seq_start) {
let result_dst = base.add(result_slot_offset) as *mut u64;
for w in 0..words {
std::ptr::write_volatile(result_dst.add(w), replica_bufs[0][w]);
}
wins.fetch_add(1, Ordering::Relaxed);
break;
}
}
}
state.store(STATE_DONE, Ordering::SeqCst);
}
})
.expect("failed to spawn slayer core thread");
}
#[inline(always)]
pub fn read(&self, index: usize) -> T {
self.request_idx.store(index, Ordering::SeqCst);
self.state.store(STATE_READY, Ordering::SeqCst);
while self.state.load(Ordering::SeqCst) != STATE_DONE {
std::hint::spin_loop();
#[cfg(target_arch = "x86_64")]
{ std::arch::x86_64::_mm_pause(); }
#[cfg(target_arch = "aarch64")]
{ std::arch::asm!("yield"); }
}
let result: T = unsafe {
let src = self.slab.ptr().add(self.result_slot_offset) as *const T;
std::ptr::read_volatile(src)
};
self.state.store(STATE_IDLE, Ordering::SeqCst);
result
}
}