use alloc::alloc::{alloc, dealloc, Layout};
use core::cell::Cell;
use core::default::Default;
use core::fmt;
use core::mem::{align_of, size_of};
use core::ops::{Drop, FnMut};
use core::slice::from_raw_parts_mut;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(loom)]
pub use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crossbeam_utils::CachePadded;
use crate::context::MAX_PENDING_OPS;
use crate::replica::MAX_THREADS_PER_REPLICA;
const DEFAULT_LOG_BYTES: usize = 32 * 1024 * 1024;
const_assert!(DEFAULT_LOG_BYTES >= 1 && (DEFAULT_LOG_BYTES & (DEFAULT_LOG_BYTES - 1) == 0));
#[cfg(not(loom))]
pub const MAX_REPLICAS_PER_LOG: usize = 192;
#[cfg(loom)] pub const MAX_REPLICAS_PER_LOG: usize = 3;
const GC_FROM_HEAD: usize = MAX_PENDING_OPS * MAX_THREADS_PER_REPLICA;
const_assert!(GC_FROM_HEAD >= 1 && (GC_FROM_HEAD & (GC_FROM_HEAD - 1) == 0));
const WARN_THRESHOLD: usize = 1 << 28;
#[derive(Default)]
#[repr(align(64))]
struct Entry<T>
where
T: Sized + Clone,
{
operation: Option<T>,
replica: usize,
alivef: AtomicBool,
}
#[repr(align(64))]
pub struct Log<'a, T>
where
T: Sized + Clone,
{
rawp: *mut u8,
rawb: usize,
size: usize,
slog: &'a [Cell<Entry<T>>],
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
ctail: CachePadded<AtomicUsize>,
ltails: [CachePadded<AtomicUsize>; MAX_REPLICAS_PER_LOG],
next: CachePadded<AtomicUsize>,
lmasks: [CachePadded<Cell<bool>>; MAX_REPLICAS_PER_LOG],
}
impl<'a, T> fmt::Debug for Log<'a, T>
where
T: Sized + Clone,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Log")
.field("head", &self.tail)
.field("tail", &self.head)
.field("size", &self.size)
.finish()
}
}
unsafe impl<'a, T> Send for Log<'a, T> where T: Sized + Clone {}
unsafe impl<'a, T> Sync for Log<'a, T> where T: Sized + Clone {}
impl<'a, T> Log<'a, T>
where
T: Sized + Clone,
{
pub fn new<'b>(bytes: usize) -> Log<'b, T> {
let mut num = bytes / Log::<T>::entry_size();
if num < 2 * GC_FROM_HEAD {
num = 2 * GC_FROM_HEAD;
}
if !num.is_power_of_two() {
num = num.checked_next_power_of_two().unwrap_or(2 * GC_FROM_HEAD)
};
let b = num * Log::<T>::entry_size();
let mem = unsafe {
alloc(
Layout::from_size_align(b, align_of::<Cell<Entry<T>>>())
.expect("Alignment error while allocating the shared log!"),
)
};
if mem.is_null() {
panic!("Failed to allocate memory for the shared log!");
}
let raw = unsafe { from_raw_parts_mut(mem as *mut Cell<Entry<T>>, num) };
for e in raw.iter_mut() {
unsafe {
::core::ptr::write(
e,
Cell::new(Entry {
operation: None,
replica: 0usize,
alivef: AtomicBool::new(false),
}),
);
}
}
#[allow(clippy::declare_interior_mutable_const)]
const LMASK_DEFAULT: CachePadded<Cell<bool>> = CachePadded::new(Cell::new(true));
#[cfg(not(loom))]
{
#[allow(clippy::declare_interior_mutable_const)]
const LTAIL_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));
Log {
rawp: mem,
rawb: b,
size: num,
slog: raw,
head: CachePadded::new(AtomicUsize::new(0usize)),
tail: CachePadded::new(AtomicUsize::new(0usize)),
ctail: CachePadded::new(AtomicUsize::new(0usize)),
ltails: [LTAIL_DEFAULT; MAX_REPLICAS_PER_LOG],
next: CachePadded::new(AtomicUsize::new(1usize)),
lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
}
}
#[cfg(loom)]
{
use arr_macro::arr;
Log {
rawp: mem,
rawb: b,
size: num,
slog: raw,
head: CachePadded::new(AtomicUsize::new(0usize)),
tail: CachePadded::new(AtomicUsize::new(0usize)),
ctail: CachePadded::new(AtomicUsize::new(0usize)),
ltails: arr![CachePadded::new(AtomicUsize::new(0)); 3], next: CachePadded::new(AtomicUsize::new(1usize)),
lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG],
}
}
}
fn entry_size() -> usize {
size_of::<Cell<Entry<T>>>()
}
pub(crate) fn register(&self) -> Option<usize> {
loop {
let n = self.next.load(Ordering::Relaxed);
if n >= MAX_REPLICAS_PER_LOG {
return None;
};
if self
.next
.compare_exchange_weak(n, n + 1, Ordering::SeqCst, Ordering::SeqCst)
!= Ok(n)
{
continue;
};
return Some(n);
}
}
#[inline(always)]
#[doc(hidden)]
pub fn append<F: FnMut(T, usize)>(&self, ops: &[T], idx: usize, mut s: F) {
let nops = ops.len();
let mut iteration = 1;
let mut waitgc = 1;
loop {
if iteration % WARN_THRESHOLD == 0 {
warn!(
"append(ops.len()={}, {}) takes too many iterations ({}) to complete...",
ops.len(),
idx,
iteration,
);
}
iteration += 1;
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);
if tail > head + self.size - GC_FROM_HEAD {
if waitgc % WARN_THRESHOLD == 0 {
warn!(
"append(ops.len()={}, {}) takes too many iterations ({}) waiting for gc...",
ops.len(),
idx,
waitgc,
);
}
waitgc += 1;
self.exec(idx, &mut s);
#[cfg(loom)]
loom::thread::yield_now();
continue;
}
let mut advance = false;
if tail + nops > head + self.size - GC_FROM_HEAD {
advance = true
};
if self.tail.compare_exchange_weak(
tail,
tail + nops,
Ordering::Acquire,
Ordering::Acquire,
) != Ok(tail)
{
continue;
};
for (i, op) in ops.iter().enumerate().take(nops) {
let e = self.slog[self.index(tail + i)].as_ptr();
let mut m = self.lmasks[idx - 1].get();
if unsafe { (*e).alivef.load(Ordering::Relaxed) == m } {
m = !m;
}
unsafe { (*e).operation = Some(op.clone()) };
unsafe { (*e).replica = idx };
unsafe { (*e).alivef.store(m, Ordering::Release) };
}
if advance {
self.advance_head(idx, &mut s);
}
return;
}
}
#[inline(always)]
pub(crate) fn exec<F: FnMut(T, usize)>(&self, idx: usize, d: &mut F) {
let ltail = self.ltails[idx - 1].load(Ordering::Relaxed);
let gtail = self.tail.load(Ordering::Relaxed);
if ltail == gtail {
return;
}
let h = self.head.load(Ordering::Relaxed);
if ltail > gtail || ltail < h {
panic!("Local tail not within the shared log!")
};
for i in ltail..gtail {
let mut iteration = 1;
let e = self.slog[self.index(i)].as_ptr();
while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx - 1].get() } {
if iteration % WARN_THRESHOLD == 0 {
warn!(
"alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...",
i,
self.index(i),
idx - 1,
self.lmasks[idx - 1].get()
);
}
iteration += 1;
#[cfg(loom)]
loom::thread::yield_now();
}
unsafe { d((*e).operation.as_ref().unwrap().clone(), (*e).replica) };
if self.index(i) == self.size - 1 {
self.lmasks[idx - 1].set(!self.lmasks[idx - 1].get());
}
}
self.ctail.fetch_max(gtail, Ordering::Relaxed);
self.ltails[idx - 1].store(gtail, Ordering::Relaxed);
}
#[inline(always)]
fn index(&self, logical: usize) -> usize {
logical & (self.size - 1)
}
#[inline(always)]
fn advance_head<F: FnMut(T, usize)>(&self, rid: usize, mut s: &mut F) {
let mut iteration = 1;
loop {
let r = self.next.load(Ordering::Relaxed);
let global_head = self.head.load(Ordering::Relaxed);
let f = self.tail.load(Ordering::Relaxed);
let mut min_local_tail = self.ltails[0].load(Ordering::Relaxed);
for idx in 1..r {
let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed);
if min_local_tail > cur_local_tail {
min_local_tail = cur_local_tail
};
}
if min_local_tail == global_head {
if iteration % WARN_THRESHOLD == 0 {
warn!("Spending a long time in `advance_head`, are we starving?");
}
iteration += 1;
self.exec(rid, &mut s);
#[cfg(loom)]
loom::thread::yield_now();
continue;
}
self.head.store(min_local_tail, Ordering::Relaxed);
if f < min_local_tail + self.size - GC_FROM_HEAD {
return;
} else {
self.exec(rid, &mut s);
}
}
}
#[doc(hidden)]
#[inline(always)]
pub unsafe fn reset(&self) {
self.head.store(0, Ordering::SeqCst);
self.tail.store(0, Ordering::SeqCst);
self.next.store(1, Ordering::SeqCst);
for r in 0..MAX_REPLICAS_PER_LOG {
self.ltails[r].store(0, Ordering::Relaxed);
self.lmasks[r].set(true);
}
for i in 0..self.size {
let e = self.slog[self.index(i)].as_ptr();
(*e).alivef.store(false, Ordering::Release);
}
}
#[inline(always)]
pub(crate) fn is_replica_synced_for_reads(&self, idx: usize, ctail: usize) -> bool {
self.ltails[idx - 1].load(Ordering::Relaxed) >= ctail
}
#[inline(always)]
pub(crate) fn get_ctail(&self) -> usize {
self.ctail.load(Ordering::Relaxed)
}
}
impl<'a, T> Default for Log<'a, T>
where
T: Sized + Clone,
{
fn default() -> Self {
Log::new(DEFAULT_LOG_BYTES)
}
}
impl<'a, T> Drop for Log<'a, T>
where
T: Sized + Clone,
{
fn drop(&mut self) {
unsafe {
dealloc(
self.rawp,
Layout::from_size_align(self.rawb, align_of::<Cell<Entry<T>>>())
.expect("Alignment error while deallocating the shared log!"),
)
};
}
}
#[cfg(test)]
mod tests {
extern crate std;
use super::*;
use std::sync::Arc;
#[derive(Clone)] #[derive(Debug, PartialEq)] enum Operation {
Read,
Write(u64),
Invalid,
}
impl Default for Operation {
fn default() -> Operation {
Operation::Invalid
}
}
#[test]
fn test_entry_create_default() {
let e = Entry::<Operation>::default();
assert_eq!(e.operation, None);
assert_eq!(e.replica, 0);
assert_eq!(e.alivef.load(Ordering::Relaxed), false);
}
#[test]
fn test_log_entry_size() {
assert_eq!(Log::<Operation>::entry_size(), 64);
}
#[test]
fn test_log_create() {
let l = Log::<Operation>::new(1024 * 1024);
let n = (1024 * 1024) / Log::<Operation>::entry_size();
assert_eq!(l.rawb, 1024 * 1024);
assert_eq!(l.size, n);
assert_eq!(l.slog.len(), n);
assert_eq!(l.head.load(Ordering::Relaxed), 0);
assert_eq!(l.tail.load(Ordering::Relaxed), 0);
assert_eq!(l.next.load(Ordering::Relaxed), 1);
assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
for i in 0..MAX_REPLICAS_PER_LOG {
assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
}
for i in 0..MAX_REPLICAS_PER_LOG {
assert_eq!(l.lmasks[i].get(), true);
}
}
#[test]
fn test_log_min_size() {
let l = Log::<Operation>::new(1024);
assert_eq!(l.rawb, 2 * GC_FROM_HEAD * Log::<Operation>::entry_size());
assert_eq!(l.size, 2 * GC_FROM_HEAD);
assert_eq!(l.slog.len(), 2 * GC_FROM_HEAD);
}
#[test]
fn test_log_power_of_two() {
let l = Log::<Operation>::new(524 * 1024);
let n = ((524 * 1024) / Log::<Operation>::entry_size()).checked_next_power_of_two();
assert_eq!(l.rawb, n.unwrap() * Log::<Operation>::entry_size());
assert_eq!(l.size, n.unwrap());
assert_eq!(l.slog.len(), n.unwrap());
}
#[test]
fn test_log_create_default() {
let l = Log::<Operation>::default();
let n = DEFAULT_LOG_BYTES / Log::<Operation>::entry_size();
assert_eq!(l.rawb, DEFAULT_LOG_BYTES);
assert_eq!(l.size, n);
assert_eq!(l.slog.len(), n);
assert_eq!(l.head.load(Ordering::Relaxed), 0);
assert_eq!(l.tail.load(Ordering::Relaxed), 0);
assert_eq!(l.next.load(Ordering::Relaxed), 1);
assert_eq!(l.ctail.load(Ordering::Relaxed), 0);
for i in 0..MAX_REPLICAS_PER_LOG {
assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0);
}
for i in 0..MAX_REPLICAS_PER_LOG {
assert_eq!(l.lmasks[i].get(), true);
}
}
#[test]
fn test_log_index() {
let l = Log::<Operation>::new(2 * 1024 * 1024);
assert_eq!(l.index(99000), 696);
}
#[test]
fn test_log_register() {
let l = Log::<Operation>::new(1024);
assert_eq!(l.register(), Some(1));
assert_eq!(l.next.load(Ordering::Relaxed), 2);
}
#[test]
fn test_log_register_none() {
let l = Log::<Operation>::new(1024);
l.next.store(MAX_REPLICAS_PER_LOG, Ordering::Relaxed);
assert!(l.register().is_none());
assert_eq!(l.next.load(Ordering::Relaxed), MAX_REPLICAS_PER_LOG);
}
#[test]
fn test_log_append() {
let l = Log::<Operation>::default();
let o = [Operation::Read];
l.append(&o, 1, |_o: Operation, _i: usize| {});
assert_eq!(l.head.load(Ordering::Relaxed), 0);
assert_eq!(l.tail.load(Ordering::Relaxed), 1);
let slog = l.slog[0].take();
assert_eq!(slog.operation, Some(Operation::Read));
assert_eq!(slog.replica, 1);
}
#[test]
fn test_log_append_multiple() {
let l = Log::<Operation>::default();
let o = [Operation::Read, Operation::Write(119)];
l.append(&o, 1, |_o: Operation, _i: usize| {});
assert_eq!(l.head.load(Ordering::Relaxed), 0);
assert_eq!(l.tail.load(Ordering::Relaxed), 2);
}
#[test]
fn test_log_advance_head() {
let l = Log::<Operation>::default();
l.next.store(5, Ordering::Relaxed);
l.ltails[0].store(1023, Ordering::Relaxed);
l.ltails[1].store(224, Ordering::Relaxed);
l.ltails[2].store(4096, Ordering::Relaxed);
l.ltails[3].store(799, Ordering::Relaxed);
l.advance_head(0, &mut |_o: Operation, _i: usize| {});
assert_eq!(l.head.load(Ordering::Relaxed), 224);
}
#[test]
fn test_log_append_gc() {
let l = Log::<Operation>::default();
let o: [Operation; 4] = unsafe {
let mut a: [Operation; 4] = ::std::mem::MaybeUninit::zeroed().assume_init();
for i in &mut a[..] {
::std::ptr::write(i, Operation::Read);
}
a
};
l.next.store(2, Ordering::Relaxed);
l.tail.store(l.size - GC_FROM_HEAD - 1, Ordering::Relaxed);
l.ltails[0].store(1024, Ordering::Relaxed);
l.append(&o, 1, |_o: Operation, _i: usize| {});
assert_eq!(l.head.load(Ordering::Relaxed), 1024);
assert_eq!(l.tail.load(Ordering::Relaxed), l.size - GC_FROM_HEAD + 3);
}
#[test]
fn test_log_append_wrap() {
let l = Log::<Operation>::default();
let o: [Operation; 1024] = unsafe {
let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
for i in &mut a[..] {
::std::ptr::write(i, Operation::Read);
}
a
};
l.next.store(2, Ordering::Relaxed);
l.head.store(2 * 8192, Ordering::Relaxed);
l.tail.store(l.size - 10, Ordering::Relaxed);
l.append(&o, 1, |_o: Operation, _i: usize| {});
assert_eq!(l.lmasks[0].get(), true);
assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
}
#[test]
fn test_log_exec() {
let l = Log::<Operation>::default();
let o = [Operation::Read];
let mut f = |op: Operation, i: usize| {
assert_eq!(op, Operation::Read);
assert_eq!(i, 1);
};
l.append(&o, 1, |_o: Operation, _i: usize| {});
l.exec(1, &mut f);
assert_eq!(
l.tail.load(Ordering::Relaxed),
l.ctail.load(Ordering::Relaxed)
);
assert_eq!(
l.tail.load(Ordering::Relaxed),
l.ltails[0].load(Ordering::Relaxed)
);
}
#[test]
fn test_log_exec_empty() {
let l = Log::<Operation>::default();
let mut f = |_o: Operation, _i: usize| {
assert!(false);
};
l.exec(1, &mut f);
}
#[test]
fn test_log_exec_zero() {
let l = Log::<Operation>::default();
let o = [Operation::Read];
let mut f = |op: Operation, i: usize| {
assert_eq!(op, Operation::Read);
assert_eq!(i, 1);
};
let mut g = |_op: Operation, _i: usize| {
assert!(false);
};
l.append(&o, 1, |_o: Operation, _i: usize| {});
l.exec(1, &mut f);
l.exec(1, &mut g);
}
#[test]
fn test_log_exec_multiple() {
let l = Log::<Operation>::default();
let o = [Operation::Read, Operation::Write(119)];
let mut s = 0;
let mut f = |op: Operation, _i: usize| match op {
Operation::Read => s += 121,
Operation::Write(v) => s += v,
Operation::Invalid => assert!(false),
};
l.append(&o, 1, |_o: Operation, _i: usize| {});
l.exec(1, &mut f);
assert_eq!(s, 240);
assert_eq!(
l.tail.load(Ordering::Relaxed),
l.ctail.load(Ordering::Relaxed)
);
assert_eq!(
l.tail.load(Ordering::Relaxed),
l.ltails[0].load(Ordering::Relaxed)
);
}
#[test]
fn test_log_exec_wrap() {
let l = Log::<Operation>::default();
let o: [Operation; 1024] = unsafe {
let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
for i in &mut a[..] {
::std::ptr::write(i, Operation::Read);
}
a
};
let mut f = |op: Operation, i: usize| {
assert_eq!(op, Operation::Read);
assert_eq!(i, 1);
};
l.append(&o, 1, |_o: Operation, _i: usize| {}); l.next.store(2, Ordering::SeqCst);
l.head.store(2 * 8192, Ordering::SeqCst);
l.tail.store(l.size - 10, Ordering::SeqCst);
l.append(&o, 1, |_o: Operation, _i: usize| {});
l.ltails[0].store(l.size - 10, Ordering::SeqCst);
l.exec(1, &mut f);
assert_eq!(l.lmasks[0].get(), false);
assert_eq!(l.tail.load(Ordering::Relaxed), l.size + 1014);
}
#[test]
#[should_panic]
fn test_exec_panic() {
let l = Log::<Operation>::default();
let o: [Operation; 1024] = unsafe {
let mut a: [Operation; 1024] = ::std::mem::MaybeUninit::zeroed().assume_init();
for i in &mut a[..] {
::std::ptr::write(i, Operation::Read);
}
a
};
let mut f = |_op: Operation, _i: usize| {
assert!(false);
};
l.append(&o, 1, |_o: Operation, _i: usize| {});
l.head.store(8192, Ordering::SeqCst);
l.exec(1, &mut f);
}
#[test]
fn test_log_change_refcount() {
let l = Log::<Arc<Operation>>::default();
let o1 = [Arc::new(Operation::Read)];
let o2 = [Arc::new(Operation::Read)];
assert_eq!(Arc::strong_count(&o1[0]), 1);
assert_eq!(Arc::strong_count(&o2[0]), 1);
l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), 2);
l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), 3);
unsafe { l.reset() };
l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), 2);
assert_eq!(Arc::strong_count(&o2[0]), 2);
l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), 1);
assert_eq!(Arc::strong_count(&o2[0]), 3);
}
#[test]
fn test_log_refcount_change_with_gc() {
let entry_size = 64;
let total_entries = 16384;
assert_eq!(Log::<Arc<Operation>>::entry_size(), entry_size);
let size: usize = total_entries * entry_size;
let l = Log::<Arc<Operation>>::new(size);
let o1 = [Arc::new(Operation::Read)];
let o2 = [Arc::new(Operation::Read)];
assert_eq!(Arc::strong_count(&o1[0]), 1);
assert_eq!(Arc::strong_count(&o2[0]), 1);
for i in 1..(total_entries + 1) {
l.append(&o1[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), i + 1);
}
assert_eq!(Arc::strong_count(&o1[0]), total_entries + 1);
for i in 1..(total_entries + 1) {
l.append(&o2[..], 1, |_o: Arc<Operation>, _i: usize| {});
assert_eq!(Arc::strong_count(&o1[0]), (total_entries + 1) - i);
assert_eq!(Arc::strong_count(&o2[0]), i + 1);
}
assert_eq!(Arc::strong_count(&o1[0]), 1);
assert_eq!(Arc::strong_count(&o2[0]), total_entries + 1);
}
#[test]
fn test_replica_synced_for_read() {
let l = Log::<Operation>::default();
let one = l.register().unwrap();
let two = l.register().unwrap();
assert_eq!(one, 1);
assert_eq!(two, 2);
let o = [Operation::Read];
let mut f = |op: Operation, i: usize| {
assert_eq!(op, Operation::Read);
assert_eq!(i, 1);
};
l.append(&o, one, |_o: Operation, _i: usize| {});
l.exec(one, &mut f);
assert_eq!(l.is_replica_synced_for_reads(one, l.get_ctail()), true);
assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), false);
l.exec(two, &mut f);
assert_eq!(l.is_replica_synced_for_reads(two, l.get_ctail()), true);
}
}