use core::cell::RefCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{spin_loop_hint, AtomicUsize, Ordering};
use alloc::sync::Arc;
use alloc::vec::Vec;
use arr_macro::arr;
use crossbeam_utils::CachePadded;
use super::context::Context;
use super::log::Log;
use super::rwlock::RwLock;
use super::Dispatch;
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct ReplicaToken(usize);
impl !Send for ReplicaToken {}
impl ReplicaToken {
#[doc(hidden)]
pub unsafe fn new(ident: usize) -> Self {
ReplicaToken(ident)
}
pub fn id(&self) -> usize {
self.0
}
}
pub const MAX_THREADS_PER_REPLICA: usize = 256;
const_assert!(
MAX_THREADS_PER_REPLICA >= 1 && (MAX_THREADS_PER_REPLICA & (MAX_THREADS_PER_REPLICA - 1) == 0)
);
pub struct Replica<'a, D>
where
D: Sized + Default + Dispatch + Sync,
{
idx: usize,
combiner: CachePadded<AtomicUsize>,
next: CachePadded<AtomicUsize>,
contexts: Vec<Context<<D as Dispatch>::WriteOperation, <D as Dispatch>::Response>>,
buffer: RefCell<Vec<<D as Dispatch>::WriteOperation>>,
inflight: RefCell<[usize; MAX_THREADS_PER_REPLICA]>,
result: RefCell<Vec<<D as Dispatch>::Response>>,
slog: Arc<Log<'a, <D as Dispatch>::WriteOperation>>,
data: CachePadded<RwLock<D>>,
}
unsafe impl<'a, D> Sync for Replica<'a, D> where D: Sized + Default + Sync + Dispatch {}
impl<'a, D> core::fmt::Debug for Replica<'a, D>
where
D: Sized + Default + Sync + Dispatch,
{
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "Replica")
}
}
impl<'a, D> Replica<'a, D>
where
D: Sized + Default + Dispatch + Sync,
{
pub fn new<'b>(log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>) -> Arc<Replica<'b, D>> {
let mut uninit_replica: Arc<MaybeUninit<Replica<D>>> = Arc::new_zeroed();
unsafe {
let uninit_ptr = Arc::get_mut_unchecked(&mut uninit_replica).as_mut_ptr();
uninit_ptr.write(Replica {
idx: log.register().unwrap(),
combiner: CachePadded::new(AtomicUsize::new(0)),
next: CachePadded::new(AtomicUsize::new(1)),
contexts: Vec::with_capacity(MAX_THREADS_PER_REPLICA),
buffer:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
inflight: RefCell::new(arr![Default::default(); 256]),
result:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
slog: log.clone(),
data: CachePadded::new(RwLock::<D>::default()),
});
let mut replica = uninit_replica.assume_init();
for _idx in 0..MAX_THREADS_PER_REPLICA {
Arc::get_mut(&mut replica)
.unwrap()
.contexts
.push(Default::default());
}
replica
}
}
pub fn register(&self) -> Option<ReplicaToken> {
loop {
let idx = self.next.load(Ordering::SeqCst);
if idx > MAX_THREADS_PER_REPLICA {
return None;
};
if self.next.compare_and_swap(idx, idx + 1, Ordering::SeqCst) != idx {
continue;
};
return Some(ReplicaToken(idx));
}
}
pub fn execute_mut(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
while !self.make_pending(op.clone(), idx.0) {}
self.try_combine(idx.0);
self.get_response(idx.0)
}
pub fn execute(
&self,
op: <D as Dispatch>::ReadOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
self.read_only(op, idx.0)
}
fn get_response(&self, idx: usize) -> <D as Dispatch>::Response {
let mut iter = 0;
let interval = 1 << 29;
loop {
let r = self.contexts[idx - 1].res();
if r.is_some() {
return r.unwrap();
}
iter += 1;
if iter == interval {
self.try_combine(idx);
iter = 0;
}
}
}
#[doc(hidden)]
pub fn verify<F: FnMut(&D)>(&self, mut v: F) {
while self
.combiner
.compare_and_swap(0, MAX_THREADS_PER_REPLICA + 2, Ordering::Acquire)
!= 0
{}
let mut data = self.data.write(self.next.load(Ordering::Relaxed));
let mut f = |o: <D as Dispatch>::WriteOperation, _i: usize| {
data.dispatch_mut(o);
};
self.slog.exec(self.idx, &mut f);
v(&data);
self.combiner.store(0, Ordering::Release);
}
pub fn sync(&self, idx: ReplicaToken) {
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(idx.0);
spin_loop_hint();
}
}
fn read_only(
&self,
op: <D as Dispatch>::ReadOperation,
tid: usize,
) -> <D as Dispatch>::Response {
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(tid);
spin_loop_hint();
}
self.data.read(tid - 1).dispatch(op)
}
#[inline(always)]
fn make_pending(&self, op: <D as Dispatch>::WriteOperation, idx: usize) -> bool {
self.contexts[idx - 1].enqueue(op)
}
fn try_combine(&self, tid: usize) {
for _i in 0..4 {
if unsafe {
core::ptr::read_volatile(
&self.combiner
as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicUsize>
as *const usize,
)
} != 0
{
return;
};
}
if self.combiner.compare_and_swap(0, tid, Ordering::Acquire) != 0 {
return;
}
self.combine();
self.combiner.store(0, Ordering::Release);
}
#[inline(always)]
fn combine(&self) {
let mut buffer = self.buffer.borrow_mut();
let mut operations = self.inflight.borrow_mut();
let mut results = self.result.borrow_mut();
buffer.clear();
results.clear();
let next = self.next.load(Ordering::Relaxed);
for i in 1..next {
operations[i - 1] = self.contexts[i - 1].ops(&mut buffer);
}
{
let f = |o: <D as Dispatch>::WriteOperation, i: usize| {
let resp = self.data.write(next).dispatch_mut(o);
if i == self.idx {
results.push(resp);
}
};
self.slog.append(&buffer, self.idx, f);
}
{
let mut data = self.data.write(next);
let mut f = |o: <D as Dispatch>::WriteOperation, i: usize| {
let resp = data.dispatch_mut(o);
if i == self.idx {
results.push(resp)
};
};
self.slog.exec(self.idx, &mut f);
}
let (mut s, mut f) = (0, 0);
for i in 1..next {
if operations[i - 1] == 0 {
continue;
};
f += operations[i - 1];
self.contexts[i - 1].enqueue_resps(&results[s..f]);
s += operations[i - 1];
operations[i - 1] = 0;
}
}
}
#[cfg(test)]
mod test {
extern crate std;
use super::*;
use std::vec;
#[derive(Default)]
struct Data {
junk: u64,
}
impl Dispatch for Data {
type ReadOperation = u64;
type WriteOperation = u64;
type Response = Result<u64, ()>;
fn dispatch(&self, _op: Self::ReadOperation) -> Self::Response {
Ok(self.junk)
}
fn dispatch_mut(&mut self, _op: Self::WriteOperation) -> Self::Response {
self.junk += 1;
return Ok(107);
}
}
#[test]
fn test_replica_create() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.idx, 1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.next.load(Ordering::SeqCst), 1);
assert_eq!(repl.contexts.len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.buffer.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.inflight.borrow().len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.result.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.data.read(0).junk, 0);
}
#[test]
fn test_replica_register() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.register(), Some(ReplicaToken(1)));
assert_eq!(repl.next.load(Ordering::SeqCst), 2);
repl.next.store(17, Ordering::SeqCst);
assert_eq!(repl.register(), Some(ReplicaToken(17)));
assert_eq!(repl.next.load(Ordering::SeqCst), 18);
}
#[test]
fn test_replica_register_none() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next
.store(MAX_THREADS_PER_REPLICA + 1, Ordering::SeqCst);
assert!(repl.register().is_none());
}
#[test]
fn test_replica_make_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
let mut o = vec![];
assert!(repl.make_pending(121, 8));
assert_eq!(repl.contexts[7].ops(&mut o), 1);
assert_eq!(o.len(), 1);
assert_eq!(o[0], 121);
}
#[test]
fn test_replica_make_pending_false() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
for _i in 0..Context::<u64, Result<u64, ()>>::batch_size() {
assert!(repl.make_pending(121, 1))
}
assert!(!repl.make_pending(11, 1));
}
#[test]
fn test_replica_try_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[0].res(), Some(Ok(107)));
}
#[test]
fn test_replica_try_combine_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.make_pending(121, 8);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[7].res(), Some(Ok(107)));
}
#[test]
fn test_replica_try_combine_fail() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.combiner.store(8, Ordering::SeqCst);
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 0);
assert_eq!(repl.contexts[0].res(), None);
}
#[test]
fn test_replica_execute_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().unwrap();
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(1, repl.data.read(0).junk);
}
#[test]
fn test_replica_get_response() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
assert_eq!(repl.get_response(1), Ok(107));
}
#[test]
fn test_replica_execute() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(Ok(1), repl.execute(11, idx));
}
#[test]
fn test_replica_execute_not_synced() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let o = [121, 212];
slog.append(&o, 2, |_o: u64, _i: usize| {});
slog.exec(2, &mut |_o: u64, _i: usize| {});
let t1 = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(2), repl.execute(11, t1));
}
}