#![cfg_attr(not(feature = "rt"), allow(dead_code))]
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::util::bit;
use std::fmt;
use std::mem;
use std::ops;
use std::ptr;
use std::sync::atomic::Ordering::Relaxed;
pub(crate) struct Slab<T> {
pages: [Arc<Page<T>>; NUM_PAGES],
cached: [CachedPage<T>; NUM_PAGES],
}
pub(crate) struct Allocator<T> {
pages: [Arc<Page<T>>; NUM_PAGES],
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct Address(usize);
pub(crate) trait Entry: Default {
fn reset(&self);
}
pub(crate) struct Ref<T> {
value: *const Value<T>,
}
const NUM_PAGES: usize = 19;
const PAGE_INITIAL_SIZE: usize = 32;
const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1;
struct Page<T> {
slots: Mutex<Slots<T>>,
used: AtomicUsize,
allocated: AtomicBool,
len: usize,
prev_len: usize,
}
struct CachedPage<T> {
slots: *const Slot<T>,
init: usize,
}
struct Slots<T> {
slots: Vec<Slot<T>>,
head: usize,
used: usize,
}
unsafe impl<T: Sync> Sync for Page<T> {}
unsafe impl<T: Sync> Send for Page<T> {}
unsafe impl<T: Sync> Sync for CachedPage<T> {}
unsafe impl<T: Sync> Send for CachedPage<T> {}
unsafe impl<T: Sync> Sync for Ref<T> {}
unsafe impl<T: Sync> Send for Ref<T> {}
#[repr(C)]
struct Slot<T> {
value: UnsafeCell<Value<T>>,
next: u32,
}
struct Value<T> {
value: T,
page: *const Page<T>,
}
impl<T> Slab<T> {
pub(crate) fn new() -> Slab<T> {
let mut slab = Slab {
pages: Default::default(),
cached: Default::default(),
};
let mut len = PAGE_INITIAL_SIZE;
let mut prev_len: usize = 0;
for page in &mut slab.pages {
let page = Arc::get_mut(page).unwrap();
page.len = len;
page.prev_len = prev_len;
len *= 2;
prev_len += page.len;
debug_assert!(
page.len - 1 + page.prev_len < (1 << 24),
"max = {:b}",
page.len - 1 + page.prev_len
);
}
slab
}
pub(crate) fn allocator(&self) -> Allocator<T> {
Allocator {
pages: self.pages.clone(),
}
}
pub(crate) fn get(&mut self, addr: Address) -> Option<&T> {
let page_idx = addr.page();
let slot_idx = self.pages[page_idx].slot(addr);
if self.cached[page_idx].init <= slot_idx {
self.cached[page_idx].refresh(&self.pages[page_idx]);
}
if self.cached[page_idx].init <= slot_idx {
return None;
}
Some(self.cached[page_idx].get(slot_idx))
}
pub(crate) fn for_each(&mut self, mut f: impl FnMut(&T)) {
for page_idx in 0..self.pages.len() {
self.cached[page_idx].refresh(&self.pages[page_idx]);
for slot_idx in 0..self.cached[page_idx].init {
f(self.cached[page_idx].get(slot_idx));
}
}
}
pub(crate) fn compact(&mut self) {
for (idx, page) in self.pages.iter().enumerate().skip(1) {
if page.used.load(Relaxed) != 0 || !page.allocated.load(Relaxed) {
continue;
}
let mut slots = match page.slots.try_lock() {
Some(slots) => slots,
_ => continue,
};
if slots.used > 0 || slots.slots.capacity() == 0 {
continue;
}
page.allocated.store(false, Relaxed);
let vec = mem::take(&mut slots.slots);
slots.head = 0;
drop(slots);
debug_assert!(
self.cached[idx].slots.is_null() || self.cached[idx].slots == vec.as_ptr(),
"cached = {:?}; actual = {:?}",
self.cached[idx].slots,
vec.as_ptr(),
);
self.cached[idx].slots = ptr::null();
self.cached[idx].init = 0;
drop(vec);
}
}
}
impl<T> fmt::Debug for Slab<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
debug(fmt, "Slab", &self.pages[..])
}
}
impl<T: Entry> Allocator<T> {
pub(crate) fn allocate(&self) -> Option<(Address, Ref<T>)> {
for page in &self.pages[..] {
if let Some((addr, val)) = Page::allocate(page) {
return Some((addr, val));
}
}
None
}
}
impl<T> fmt::Debug for Allocator<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
debug(fmt, "slab::Allocator", &self.pages[..])
}
}
impl<T> ops::Deref for Ref<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &(*self.value).value }
}
}
impl<T> Drop for Ref<T> {
fn drop(&mut self) {
let _ = unsafe { (*self.value).release() };
}
}
impl<T: fmt::Debug> fmt::Debug for Ref<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(fmt)
}
}
impl<T: Entry> Page<T> {
fn allocate(me: &Arc<Page<T>>) -> Option<(Address, Ref<T>)> {
if me.used.load(Relaxed) == me.len {
return None;
}
let mut locked = me.slots.lock();
if locked.head < locked.slots.len() {
let locked = &mut *locked;
let idx = locked.head;
let slot = &locked.slots[idx];
locked.head = slot.next as usize;
locked.used += 1;
me.used.store(locked.used, Relaxed);
slot.value.with(|ptr| unsafe { (*ptr).value.reset() });
Some((me.addr(idx), slot.gen_ref(me)))
} else if me.len == locked.slots.len() {
None
} else {
let idx = locked.slots.len();
if idx == 0 {
locked.slots.reserve_exact(me.len);
}
locked.slots.push(Slot {
value: UnsafeCell::new(Value {
value: Default::default(),
page: &**me as *const _,
}),
next: 0,
});
locked.head += 1;
locked.used += 1;
me.used.store(locked.used, Relaxed);
me.allocated.store(true, Relaxed);
debug_assert_eq!(locked.slots.len(), locked.head);
Some((me.addr(idx), locked.slots[idx].gen_ref(me)))
}
}
}
impl<T> Page<T> {
fn slot(&self, addr: Address) -> usize {
addr.0 - self.prev_len
}
fn addr(&self, slot: usize) -> Address {
Address(slot + self.prev_len)
}
}
impl<T> Default for Page<T> {
fn default() -> Page<T> {
Page {
used: AtomicUsize::new(0),
allocated: AtomicBool::new(false),
slots: Mutex::new(Slots {
slots: Vec::new(),
head: 0,
used: 0,
}),
len: 0,
prev_len: 0,
}
}
}
impl<T> Page<T> {
fn release(&self, value: *const Value<T>) {
let mut locked = self.slots.lock();
let idx = locked.index_for(value);
locked.slots[idx].next = locked.head as u32;
locked.head = idx;
locked.used -= 1;
self.used.store(locked.used, Relaxed);
}
}
impl<T> CachedPage<T> {
fn refresh(&mut self, page: &Page<T>) {
let slots = page.slots.lock();
if !slots.slots.is_empty() {
self.slots = slots.slots.as_ptr();
self.init = slots.slots.len();
}
}
fn get(&self, idx: usize) -> &T {
assert!(idx < self.init);
unsafe {
let slot = self.slots.add(idx);
let value = slot as *const Value<T>;
&(*value).value
}
}
}
impl<T> Default for CachedPage<T> {
fn default() -> CachedPage<T> {
CachedPage {
slots: ptr::null(),
init: 0,
}
}
}
impl<T> Slots<T> {
fn index_for(&self, slot: *const Value<T>) -> usize {
use std::mem;
let base = &self.slots[0] as *const _ as usize;
assert!(base != 0, "page is unallocated");
let slot = slot as usize;
let width = mem::size_of::<Slot<T>>();
assert!(slot >= base, "unexpected pointer");
let idx = (slot - base) / width;
assert!(idx < self.slots.len() as usize);
idx
}
}
impl<T: Entry> Slot<T> {
fn gen_ref(&self, page: &Arc<Page<T>>) -> Ref<T> {
mem::forget(page.clone());
let slot = self as *const Slot<T>;
let value = slot as *const Value<T>;
Ref { value }
}
}
impl<T> Value<T> {
fn release(&self) -> Arc<Page<T>> {
let page = unsafe { Arc::from_raw(self.page) };
page.release(self as *const _);
page
}
}
impl Address {
fn page(self) -> usize {
let slot_shifted = (self.0 + PAGE_INITIAL_SIZE) >> PAGE_INDEX_SHIFT;
(bit::pointer_width() - slot_shifted.leading_zeros()) as usize
}
pub(crate) const fn as_usize(self) -> usize {
self.0
}
pub(crate) fn from_usize(src: usize) -> Address {
Address(src)
}
}
fn debug<T>(fmt: &mut fmt::Formatter<'_>, name: &str, pages: &[Arc<Page<T>>]) -> fmt::Result {
let mut capacity = 0;
let mut len = 0;
for page in pages {
if page.allocated.load(Relaxed) {
capacity += page.len;
len += page.used.load(Relaxed);
}
}
fmt.debug_struct(name)
.field("len", &len)
.field("capacity", &capacity)
.finish()
}
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
struct Foo {
cnt: AtomicUsize,
id: AtomicUsize,
}
impl Default for Foo {
fn default() -> Foo {
Foo {
cnt: AtomicUsize::new(0),
id: AtomicUsize::new(0),
}
}
}
impl Entry for Foo {
fn reset(&self) {
self.cnt.fetch_add(1, SeqCst);
}
}
#[test]
fn insert_remove() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let (addr1, foo1) = alloc.allocate().unwrap();
foo1.id.store(1, SeqCst);
assert_eq!(0, foo1.cnt.load(SeqCst));
let (addr2, foo2) = alloc.allocate().unwrap();
foo2.id.store(2, SeqCst);
assert_eq!(0, foo2.cnt.load(SeqCst));
assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst));
assert_eq!(2, slab.get(addr2).unwrap().id.load(SeqCst));
drop(foo1);
assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst));
let (addr3, foo3) = alloc.allocate().unwrap();
assert_eq!(addr3, addr1);
assert_eq!(1, foo3.cnt.load(SeqCst));
foo3.id.store(3, SeqCst);
assert_eq!(3, slab.get(addr3).unwrap().id.load(SeqCst));
drop(foo2);
drop(foo3);
slab.compact();
assert!(slab.get(addr1).is_some());
assert!(slab.get(addr2).is_some());
assert!(slab.get(addr3).is_some());
}
#[test]
fn insert_many() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries = vec![];
for i in 0..10_000 {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val));
}
for (i, (addr, v)) in entries.iter().enumerate() {
assert_eq!(i, v.id.load(SeqCst));
assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst));
}
entries.clear();
for i in 0..10_000 {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(10_000 - i, SeqCst);
entries.push((addr, val));
}
for (i, (addr, v)) in entries.iter().enumerate() {
assert_eq!(10_000 - i, v.id.load(SeqCst));
assert_eq!(10_000 - i, slab.get(*addr).unwrap().id.load(SeqCst));
}
}
#[test]
fn insert_drop_reverse() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries = vec![];
for i in 0..10_000 {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val));
}
for _ in 0..10 {
for _ in 0..1_000 {
entries.pop();
}
for (i, (addr, v)) in entries.iter().enumerate() {
assert_eq!(i, v.id.load(SeqCst));
assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst));
}
}
}
#[test]
fn no_compaction_if_page_still_in_use() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries1 = vec![];
let mut entries2 = vec![];
for i in 0..10_000 {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
if i % 2 == 0 {
entries1.push((addr, val, i));
} else {
entries2.push(val);
}
}
drop(entries2);
for (addr, _, i) in &entries1 {
assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst));
}
}
#[test]
fn compact_all() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries = vec![];
for _ in 0..2 {
entries.clear();
for i in 0..10_000 {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val));
}
let mut addrs = vec![];
for (addr, _) in entries.drain(..) {
addrs.push(addr);
}
slab.compact();
for addr in &addrs[PAGE_INITIAL_SIZE..] {
assert!(slab.get(*addr).is_none());
}
}
}
#[test]
fn issue_3014() {
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries = vec![];
for _ in 0..5 {
entries.clear();
for i in 0..(32 + 64 + 128 + 1) {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val, i));
}
for (addr, val, i) in &entries {
assert_eq!(*i, val.id.load(SeqCst));
assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst));
}
entries.pop();
slab.compact();
for (addr, val, i) in &entries {
assert_eq!(*i, val.id.load(SeqCst));
assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst));
}
}
}
}