use std::cell::UnsafeCell;
use std::sync::atomic::{fence, AtomicU32, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SlotRead {
Unchanged,
Updated { version: u32 },
Torn,
}
pub mod seqlock {
use super::*;
#[inline]
pub fn begin_write(version: &AtomicU32) -> u32 {
let cur = version.load(Ordering::Relaxed);
let odd = if cur & 1 == 0 {
cur.wrapping_add(1)
} else {
cur
};
version.store(odd, Ordering::Relaxed);
fence(Ordering::Release);
odd
}
#[inline]
pub fn end_write(version: &AtomicU32, odd: u32) {
version.store(odd.wrapping_add(1), Ordering::Release);
}
#[inline]
pub fn read(version: &AtomicU32, last_seen: u32, mut copy: impl FnMut()) -> SlotRead {
let s1 = version.load(Ordering::Acquire);
if s1 & 1 == 1 {
return SlotRead::Torn; }
if s1 == last_seen {
return SlotRead::Unchanged;
}
copy();
fence(Ordering::Acquire);
let s2 = version.load(Ordering::Acquire);
if s1 != s2 {
SlotRead::Torn
} else {
SlotRead::Updated { version: s2 }
}
}
}
pub struct DirtyBitmap {
words: Box<[AtomicU32]>,
}
impl DirtyBitmap {
pub fn new(max_chunks: usize) -> Self {
let n = max_chunks.div_ceil(32).max(1);
let words = (0..n).map(|_| AtomicU32::new(0)).collect::<Vec<_>>();
Self {
words: words.into_boxed_slice(),
}
}
#[inline]
pub fn mark(&self, chunk: usize) {
let w = chunk / 32;
let b = chunk % 32;
self.words[w].fetch_or(1u32 << b, Ordering::Release);
}
#[inline]
pub fn is_marked(&self, chunk: usize) -> bool {
let w = chunk / 32;
let b = chunk % 32;
self.words[w].load(Ordering::Acquire) & (1u32 << b) != 0
}
pub fn drain_into(&self, out: &mut Vec<usize>) {
for (wi, word) in self.words.iter().enumerate() {
let mut bits = word.swap(0, Ordering::Acquire);
while bits != 0 {
let b = bits.trailing_zeros() as usize;
out.push(wi * 32 + b);
bits &= bits - 1;
}
}
}
pub fn words_addr(&self) -> usize {
self.words.as_ptr() as usize
}
}
struct Chunk {
values: Box<[UnsafeCell<u8>]>,
versions: Box<[AtomicU32]>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SlotBinding {
pub value_addr: usize,
pub version_addr: usize,
pub chunk: usize,
}
#[inline]
pub unsafe fn foreign_write(binding: SlotBinding, dirty_words_addr: usize, bytes: &[u8]) {
let version = &*(binding.version_addr as *const AtomicU32);
let odd = seqlock::begin_write(version);
std::ptr::copy_nonoverlapping(bytes.as_ptr(), binding.value_addr as *mut u8, bytes.len());
seqlock::end_write(version, odd);
let word = binding.chunk / 32;
let bit = binding.chunk % 32;
let words = dirty_words_addr as *const AtomicU32;
(*words.add(word)).fetch_or(1u32 << bit, Ordering::Release);
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DescendResult {
pub ranges: Vec<(usize, usize)>,
pub updated: usize,
pub torn: usize,
pub chunks: usize,
}
pub struct SharedArena {
stride: usize,
chunk_slots: usize,
max_chunks: usize,
chunks: Vec<Chunk>,
dirty: DirtyBitmap,
capacity_slots: usize,
next_slot: usize,
free_list: Vec<usize>,
last_seen: Vec<u32>,
mirror: Vec<u8>,
scratch_chunks: Vec<usize>,
scratch_slot: Vec<u8>,
scratch_updated: Vec<usize>,
}
impl SharedArena {
pub fn new(stride: usize, chunk_slots: usize, max_chunks: usize) -> Self {
assert!(stride > 0 && chunk_slots > 0 && max_chunks > 0);
Self {
stride,
chunk_slots,
max_chunks,
chunks: Vec::new(),
dirty: DirtyBitmap::new(max_chunks),
capacity_slots: 0,
next_slot: 0,
free_list: Vec::new(),
last_seen: Vec::new(),
mirror: Vec::new(),
scratch_chunks: Vec::new(),
scratch_slot: vec![0u8; stride],
scratch_updated: Vec::new(),
}
}
pub fn stride(&self) -> usize {
self.stride
}
pub fn chunk_slots(&self) -> usize {
self.chunk_slots
}
pub fn capacity_slots(&self) -> usize {
self.capacity_slots
}
pub fn len(&self) -> usize {
self.next_slot - self.free_list.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn allocate(&mut self) -> usize {
if let Some(slot) = self.free_list.pop() {
return slot;
}
if self.next_slot == self.capacity_slots {
self.grow_one_chunk();
}
let slot = self.next_slot;
self.next_slot += 1;
slot
}
pub fn free(&mut self, slot: usize) {
debug_assert!(slot < self.next_slot);
self.free_list.push(slot);
}
fn grow_one_chunk(&mut self) {
assert!(
self.chunks.len() < self.max_chunks,
"SharedArena exceeded max_chunks ({})",
self.max_chunks
);
let values = (0..self.chunk_slots * self.stride)
.map(|_| UnsafeCell::new(0u8))
.collect::<Vec<_>>()
.into_boxed_slice();
let versions = (0..self.chunk_slots)
.map(|_| AtomicU32::new(0))
.collect::<Vec<_>>()
.into_boxed_slice();
self.chunks.push(Chunk { values, versions });
self.capacity_slots += self.chunk_slots;
self.last_seen.resize(self.capacity_slots, 0);
self.mirror.resize(self.capacity_slots * self.stride, 0);
}
#[inline]
fn locate(&self, slot: usize) -> (usize, usize) {
(slot / self.chunk_slots, slot % self.chunk_slots)
}
pub fn write_value(&self, slot: usize, bytes: &[u8]) {
debug_assert_eq!(bytes.len(), self.stride);
let (ci, si) = self.locate(slot);
let chunk = &self.chunks[ci];
let version = &chunk.versions[si];
let odd = seqlock::begin_write(version);
unsafe {
let dst = chunk.values.as_ptr().add(si * self.stride) as *mut u8;
std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, self.stride);
}
seqlock::end_write(version, odd);
self.dirty.mark(ci);
}
pub fn slot_binding(&self, slot: usize) -> SlotBinding {
let (ci, si) = self.locate(slot);
let chunk = &self.chunks[ci];
SlotBinding {
value_addr: unsafe { chunk.values.as_ptr().add(si * self.stride) } as usize,
version_addr: &chunk.versions[si] as *const AtomicU32 as usize,
chunk: ci,
}
}
pub fn dirty_words_addr(&self) -> usize {
self.dirty.words_addr()
}
pub fn mirror(&self) -> &[u8] {
&self.mirror
}
pub fn descend(&mut self) -> DescendResult {
let mut chunks = std::mem::take(&mut self.scratch_chunks);
let mut updated = std::mem::take(&mut self.scratch_updated);
chunks.clear();
updated.clear();
self.dirty.drain_into(&mut chunks);
let mut torn = 0usize;
for &ci in &chunks {
if ci >= self.chunks.len() {
continue;
}
let mut chunk_had_torn = false;
let base_slot = ci * self.chunk_slots;
for si in 0..self.chunk_slots {
let slot = base_slot + si;
if slot >= self.next_slot {
break;
}
let version = &self.chunks[ci].versions[si];
let last = self.last_seen[slot];
let stride = self.stride;
let src_ptr =
unsafe { self.chunks[ci].values.as_ptr().add(si * stride) } as *const u8;
let scratch = &mut self.scratch_slot;
let outcome = seqlock::read(version, last, || {
unsafe {
std::ptr::copy_nonoverlapping(src_ptr, scratch.as_mut_ptr(), stride);
}
});
match outcome {
SlotRead::Unchanged => {}
SlotRead::Updated { version } => {
self.last_seen[slot] = version;
let off = slot * self.stride;
self.mirror[off..off + self.stride].copy_from_slice(&self.scratch_slot);
updated.push(slot);
}
SlotRead::Torn => {
torn += 1;
chunk_had_torn = true;
}
}
}
if chunk_had_torn {
self.dirty.mark(ci);
}
}
let ranges = self.coalesce(&mut updated);
let chunks_descended = chunks.len();
self.scratch_chunks = chunks;
self.scratch_updated = updated;
DescendResult {
ranges,
updated: self.scratch_updated.len(),
torn,
chunks: chunks_descended,
}
}
fn coalesce(&self, updated: &mut [usize]) -> Vec<(usize, usize)> {
if updated.is_empty() {
return Vec::new();
}
updated.sort_unstable();
let mut ranges = Vec::new();
let mut run_start = updated[0];
let mut run_end = updated[0]; for &slot in &updated[1..] {
if slot == run_end + 1 {
run_end = slot;
} else {
ranges.push((
run_start * self.stride,
(run_end - run_start + 1) * self.stride,
));
run_start = slot;
run_end = slot;
}
}
ranges.push((
run_start * self.stride,
(run_end - run_start + 1) * self.stride,
));
ranges
}
}
#[cfg(test)]
mod tests {
use super::*;
const STRIDE: usize = 64;
fn ramp(seed: u8) -> Vec<u8> {
(0..STRIDE as u8).map(|i| i.wrapping_add(seed)).collect()
}
#[test]
fn seqlock_odd_even_cycle() {
let v = AtomicU32::new(0);
let odd = seqlock::begin_write(&v);
assert_eq!(odd, 1);
assert_eq!(v.load(Ordering::Relaxed) & 1, 1, "version odd during write");
seqlock::end_write(&v, odd);
assert_eq!(v.load(Ordering::Relaxed), 2);
assert_eq!(v.load(Ordering::Relaxed) & 1, 0, "version even after write");
}
#[test]
fn seqlock_read_unchanged_updated_clean() {
let v = AtomicU32::new(4);
assert_eq!(seqlock::read(&v, 4, || {}), SlotRead::Unchanged);
assert_eq!(
seqlock::read(&v, 2, || {}),
SlotRead::Updated { version: 4 }
);
}
#[test]
fn seqlock_read_detects_odd_start() {
let v = AtomicU32::new(3); assert_eq!(seqlock::read(&v, 0, || {}), SlotRead::Torn);
}
#[test]
fn seqlock_read_detects_interleaved_write() {
let v = AtomicU32::new(2);
let r = seqlock::read(&v, 0, || {
v.store(3, Ordering::Relaxed); });
assert_eq!(r, SlotRead::Torn);
let v = AtomicU32::new(2);
let r = seqlock::read(&v, 0, || {
v.store(4, Ordering::Relaxed);
});
assert_eq!(r, SlotRead::Torn);
}
#[test]
fn dirty_bitmap_mark_and_drain() {
let bm = DirtyBitmap::new(100);
bm.mark(0);
bm.mark(5);
bm.mark(63);
assert!(bm.is_marked(5));
let mut out = Vec::new();
bm.drain_into(&mut out);
assert_eq!(out, vec![0, 5, 63]);
assert!(!bm.is_marked(5), "drain clears bits");
out.clear();
bm.drain_into(&mut out);
assert!(out.is_empty(), "second drain is empty");
}
#[test]
fn write_then_descend_roundtrip() {
let mut a = SharedArena::new(STRIDE, 4, 16);
let s0 = a.allocate();
let s1 = a.allocate();
let s2 = a.allocate();
assert_eq!((s0, s1, s2), (0, 1, 2));
a.write_value(s0, &ramp(10));
a.write_value(s1, &ramp(20));
a.write_value(s2, &ramp(30));
let r = a.descend();
assert_eq!(r.torn, 0);
assert_eq!(r.updated, 3);
assert_eq!(r.ranges, vec![(0, 3 * STRIDE)]);
assert_eq!(&a.mirror()[0..STRIDE], &ramp(10)[..]);
assert_eq!(&a.mirror()[STRIDE..2 * STRIDE], &ramp(20)[..]);
assert_eq!(&a.mirror()[2 * STRIDE..3 * STRIDE], &ramp(30)[..]);
let r2 = a.descend();
assert!(r2.ranges.is_empty());
assert_eq!(r2.updated, 0);
}
#[test]
fn descend_coalesces_noncontiguous_runs() {
let mut a = SharedArena::new(STRIDE, 8, 16); for _ in 0..8 {
a.allocate();
}
a.write_value(0, &ramp(1));
a.write_value(1, &ramp(2));
a.write_value(2, &ramp(3));
a.write_value(5, &ramp(4));
let r = a.descend();
assert_eq!(r.updated, 4);
assert_eq!(r.ranges, vec![(0, 3 * STRIDE), (5 * STRIDE, STRIDE)]);
}
#[test]
fn dirty_scan_tracks_touched_chunks_not_total() {
let mut a = SharedArena::new(STRIDE, 4, 64);
for _ in 0..200 {
a.allocate(); }
a.write_value(100, &ramp(7)); let mut probe = Vec::new();
a.dirty.drain_into(&mut probe);
assert_eq!(probe, vec![25]);
a.dirty.mark(25);
let r = a.descend();
assert_eq!(r.updated, 1);
assert_eq!(r.ranges, vec![(100 * STRIDE, STRIDE)]);
}
#[test]
fn stable_addressing_across_grow() {
let mut a = SharedArena::new(STRIDE, 4, 16);
for _ in 0..4 {
a.allocate();
}
let b0 = a.slot_binding(0);
let b3 = a.slot_binding(3);
let dirty_addr = a.dirty_words_addr();
for _ in 0..20 {
a.allocate();
}
assert_eq!(a.slot_binding(0), b0, "slot 0 address moved on grow");
assert_eq!(a.slot_binding(3), b3, "slot 3 address moved on grow");
assert_eq!(a.dirty_words_addr(), dirty_addr, "bitmap address moved");
}
#[test]
fn foreign_write_matches_in_process_write() {
let mut a = SharedArena::new(STRIDE, 4, 16);
for _ in 0..4 {
a.allocate();
}
let binding = a.slot_binding(2);
let dirty_addr = a.dirty_words_addr();
unsafe {
foreign_write(binding, dirty_addr, &ramp(42));
}
let r = a.descend();
assert_eq!(r.updated, 1);
assert_eq!(r.torn, 0);
assert_eq!(r.ranges, vec![(2 * STRIDE, STRIDE)]);
assert_eq!(&a.mirror()[2 * STRIDE..3 * STRIDE], &ramp(42)[..]);
}
#[test]
fn allocate_reuses_freed_slots() {
let mut a = SharedArena::new(STRIDE, 4, 16);
let s0 = a.allocate();
let s1 = a.allocate();
assert_eq!(a.len(), 2);
a.free(s0);
assert_eq!(a.len(), 1);
let s2 = a.allocate();
assert_eq!(s2, s0, "freed slot is reused");
assert_ne!(s1, s2);
assert_eq!(a.len(), 2);
}
}