ultraslayer 0.2.3

Ultra‑low latency DRAM refresh‑stall prevention using hardware‑level hedging.
Documentation
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam::utils::CachePadded;

use crate::arch::ArchConfig;
use crate::slab::HugeSlab;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SpinPolicy {
    Busy = 0,
    HybridYield = 1,
    Sleep = 2,
}

pub struct SlayerStats {
    pub total_reads: u64,
    pub wins: u64,
    pub misses: u64,
}

const STATE_IDLE: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DONE: u8 = 2;

pub const MAX_T_SIZE: usize = 64;
const RESULT_SLOT_BACK_OFFSET: usize = MAX_T_SIZE * 2;

struct SeqLock {
    seq: AtomicUsize,
}

impl SeqLock {
    fn new() -> Self { Self { seq: AtomicUsize::new(0) } }
    #[inline(always)] fn write_begin(&self) { self.seq.fetch_add(1, Ordering::Release); }
    #[inline(always)] fn write_end(&self) { self.seq.fetch_add(1, Ordering::Release); }
    #[inline(always)] fn read_begin(&self) -> usize { self.seq.load(Ordering::Acquire) }
    #[inline(always)] fn read_end(&self, start: usize) -> bool {
        (start & 1 == 0) && (self.seq.load(Ordering::Acquire) == start)
    }
}

pub struct UltraSlayer<T> {
    slab: Arc<HugeSlab>,
    request_idx: Arc<CachePadded<AtomicUsize>>,
    state: Arc<CachePadded<AtomicU8>>,
    spin_policy: Arc<AtomicU8>,
    result_slot_offset: usize,
    num_replicas: usize,
    config: ArchConfig,
    elem_size: usize,
    seqlock: Arc<SeqLock>,
    #[allow(dead_code)]
    capacity: usize,
    _marker: PhantomData<T>,
    total_reads: Arc<AtomicUsize>,
    wins: Arc<AtomicUsize>,
    misses: Arc<AtomicUsize>,
}

impl<T: Copy + Send + Sync + 'static> UltraSlayer<T> {
    pub fn new(num_replicas: usize, capacity: usize) -> Self {
        assert!(num_replicas >= 2, "num_replicas must be >= 2 for hedging");
        let elem_size = std::mem::size_of::<T>();
        assert!(elem_size <= MAX_T_SIZE, "T is {elem_size} bytes; MAX_T_SIZE is {MAX_T_SIZE}");

        let config = ArchConfig::for_platform();
        assert!(config.replica_offset >= capacity * elem_size, "replica_offset too small");

        let data_size = config.replica_offset * num_replicas;
        let slab_size = data_size.checked_add(RESULT_SLOT_BACK_OFFSET).expect("slab size overflow");

        Self {
            slab: Arc::new(HugeSlab::new(slab_size)),
            request_idx: Arc::new(CachePadded::new(AtomicUsize::new(0))),
            state: Arc::new(CachePadded::new(AtomicU8::new(STATE_IDLE))),
            spin_policy: Arc::new(AtomicU8::new(SpinPolicy::Busy as u8)),
            result_slot_offset: data_size,
            num_replicas,
            config,
            elem_size,
            seqlock: Arc::new(SeqLock::new()),
            capacity,
            _marker: PhantomData,
            total_reads: Arc::new(AtomicUsize::new(0)),
            wins: Arc::new(AtomicUsize::new(0)),
            misses: Arc::new(AtomicUsize::new(0)),
        }
    }

    #[cfg(feature = "slice")]
    pub unsafe fn slice(&self) -> crate::slice::Slice<'_, T> {
        crate::slice::Slice::from_raw_parts(self.slab.ptr() as *mut T, self.capacity)
    }

    pub fn set_spin_policy(&self, policy: SpinPolicy) {
        self.spin_policy.store(policy as u8, Ordering::SeqCst);
    }

    pub fn stats(&self) -> SlayerStats {
        SlayerStats {
            total_reads: self.total_reads.load(Ordering::Relaxed) as u64,
            wins: self.wins.load(Ordering::Relaxed) as u64,
            misses: self.misses.load(Ordering::Relaxed) as u64,
        }
    }

    pub fn pin_to_core(&self, core_id: usize) {
        let core_ids = core_affinity::get_core_ids().expect("core_affinity failed");
        let target = core_ids.into_iter().find(|c| c.id == core_id).unwrap();
        core_affinity::set_for_current(target);
    }

    pub fn insert(&self, index: usize, value: T) {
        let size = self.elem_size;
        let slab_size = self.slab.size();
        self.seqlock.write_begin();
        for r in 0..self.num_replicas {
            let offset = self.config.replica_offset * r + index * size;
            assert!(offset + size <= slab_size, "insert out of bounds");
            unsafe { std::ptr::copy_nonoverlapping(&value as *const T as *const u8, self.slab.ptr().add(offset), size); }
        }
        self.seqlock.write_end();
        std::sync::atomic::fence(Ordering::Release);
    }

    pub fn spawn_slayer_core(&self, cpu_id: usize) {
        let (request_idx, state, spin_policy, slab, replica_offset, result_slot_offset, size, num_replicas, seqlock, total_reads, wins, misses) = 
            (Arc::clone(&self.request_idx), Arc::clone(&self.state), Arc::clone(&self.spin_policy), Arc::clone(&self.slab), self.config.replica_offset, self.result_slot_offset, self.elem_size, self.num_replicas, Arc::clone(&self.seqlock), Arc::clone(&self.total_reads), Arc::clone(&self.wins), Arc::clone(&self.misses));

        std::thread::Builder::new().name(format!("slayer-core-{cpu_id}")).spawn(move || {
            let core_ids = core_affinity::get_core_ids().unwrap();
            core_affinity::set_for_current(core_ids.into_iter().find(|c| c.id == cpu_id).unwrap());
            let slab_ptr = slab.ptr();
            let slab_size = slab.size();
            unsafe { for i in (0..slab_size).step_by(4096) { std::ptr::write_volatile(slab_ptr.add(i), 0); } }

            loop {
                while state.load(Ordering::SeqCst) != STATE_READY {
                    let policy = spin_policy.load(Ordering::Relaxed);
                    match policy {
                        0 => { 
                            std::hint::spin_loop();
                            #[cfg(target_arch = "x86_64")] { std::arch::x86_64::_mm_pause(); }
                            #[cfg(target_arch = "aarch64")] { std::arch::asm!("yield"); }
                        },
                        1 => { for _ in 0..100 { std::hint::spin_loop(); } std::thread::yield_now(); },
                        _ => { std::thread::sleep(std::time::Duration::from_micros(10)); }
                    }
                }
                let idx = request_idx.load(Ordering::SeqCst);
                total_reads.fetch_add(1, Ordering::Relaxed);
                let max_offset = replica_offset * (num_replicas - 1) + idx * size + size;
                if max_offset > result_slot_offset {
                    state.store(STATE_DONE, Ordering::SeqCst);
                    misses.fetch_add(1, Ordering::Relaxed);
                    continue;
                }
                unsafe {
                    let base = slab.ptr();
                    let words = size.div_ceil(8).min(8);
                    loop {
                        let seq_start = seqlock.read_begin();
                        for r in 0..num_replicas.min(2) {
                            let addr = base.add(replica_offset * r + idx * size);
                            #[cfg(target_arch = "x86_64")] std::arch::x86_64::_mm_prefetch(addr as *const i8, std::arch::x86_64::_MM_HINT_T0);
                        }
                        let mut replica_bufs: [[u64; 8]; 2] = [[0u64; 8]; 2];
                        for r in 0..num_replicas.min(2) {
                            let addr = base.add(replica_offset * r + idx * size) as *const u64;
                            for w in 0..words { replica_bufs[r][w] = std::ptr::read_volatile(addr.add(w)); }
                        }
                        if seqlock.read_end(seq_start) {
                            let result_dst = base.add(result_slot_offset) as *mut u64;
                            for w in 0..words { std::ptr::write_volatile(result_dst.add(w), replica_bufs[0][w]); }
                            wins.fetch_add(1, Ordering::Relaxed);
                            break;
                        }
                    }
                }
                state.store(STATE_DONE, Ordering::SeqCst);
            }
        }).unwrap();
    }

    #[inline(always)]
    pub fn read(&self, index: usize) -> T {
        self.request_idx.store(index, Ordering::SeqCst);
        self.state.store(STATE_READY, Ordering::SeqCst);
        while self.state.load(Ordering::SeqCst) != STATE_DONE {
            std::hint::spin_loop();
            #[cfg(target_arch = "x86_64")] { std::arch::x86_64::_mm_pause(); }
            #[cfg(target_arch = "aarch64")] { std::arch::asm!("yield"); }
        }
        let result: T = unsafe { std::ptr::read_volatile(self.slab.ptr().add(self.result_slot_offset) as *const T) };
        self.state.store(STATE_IDLE, Ordering::SeqCst);
        result
    }
}