use core::cell::RefCell;
use core::hint::spin_loop;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(loom)]
use loom::sync::atomic::{AtomicUsize, Ordering};
#[cfg(not(loom))]
use alloc::sync::Arc;
#[cfg(loom)]
use loom::sync::Arc;
use alloc::vec::Vec;
use crossbeam_utils::CachePadded;
use super::context::Context;
use super::log::Log;
use super::rwlock::RwLock;
use super::Dispatch;
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct ReplicaToken(usize);
#[cfg(features = "unstable")]
impl !Send for ReplicaToken {}
impl ReplicaToken {
#[doc(hidden)]
pub unsafe fn new(ident: usize) -> Self {
ReplicaToken(ident)
}
pub fn id(&self) -> usize {
self.0
}
}
#[cfg(not(loom))]
pub const MAX_THREADS_PER_REPLICA: usize = 256;
#[cfg(loom)]
pub const MAX_THREADS_PER_REPLICA: usize = 2;
const_assert!(
MAX_THREADS_PER_REPLICA >= 1 && (MAX_THREADS_PER_REPLICA & (MAX_THREADS_PER_REPLICA - 1) == 0)
);
pub struct Replica<'a, D>
where
D: Sized + Dispatch + Sync,
{
idx: usize,
combiner: CachePadded<AtomicUsize>,
next: CachePadded<AtomicUsize>,
contexts: Vec<Context<<D as Dispatch>::WriteOperation, <D as Dispatch>::Response>>,
buffer: RefCell<Vec<<D as Dispatch>::WriteOperation>>,
inflight: RefCell<[usize; MAX_THREADS_PER_REPLICA]>,
result: RefCell<Vec<<D as Dispatch>::Response>>,
slog: Arc<Log<'a, <D as Dispatch>::WriteOperation>>,
data: CachePadded<RwLock<D>>,
}
unsafe impl<'a, D> Sync for Replica<'a, D> where D: Sized + Sync + Dispatch {}
impl<'a, D> core::fmt::Debug for Replica<'a, D>
where
D: Sized + Sync + Dispatch,
{
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "Replica")
}
}
impl<'a, D> Replica<'a, D>
where
D: Sized + Default + Dispatch + Sync,
{
pub fn new<'b>(log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>) -> Arc<Replica<'b, D>> {
Replica::with_data(log, Default::default())
}
}
impl<'a, D> Replica<'a, D>
where
D: Sized + Dispatch + Sync,
{
#[cfg(not(feature = "unstable"))]
pub fn with_data<'b>(
log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
d: D,
) -> Arc<Replica<'b, D>> {
let mut contexts = Vec::with_capacity(MAX_THREADS_PER_REPLICA);
for _idx in 0..MAX_THREADS_PER_REPLICA {
contexts.push(Default::default());
}
Arc::new(
Replica {
idx: log.register().unwrap(),
combiner: CachePadded::new(AtomicUsize::new(0)),
next: CachePadded::new(AtomicUsize::new(1)),
contexts,
buffer:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
result:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
slog: log.clone(),
data: CachePadded::new(RwLock::<D>::new(d)),
},
)
}
#[cfg(feature = "unstable")]
pub fn with_data<'b>(
log: &Arc<Log<'b, <D as Dispatch>::WriteOperation>>,
d: D,
) -> Arc<Replica<'b, D>> {
use core::mem::MaybeUninit;
let mut uninit_replica: Arc<MaybeUninit<Replica<D>>> = Arc::new_zeroed();
unsafe {
let uninit_ptr = Arc::get_mut_unchecked(&mut uninit_replica).as_mut_ptr();
uninit_ptr.write(Replica {
idx: log.register().unwrap(),
combiner: CachePadded::new(AtomicUsize::new(0)),
next: CachePadded::new(AtomicUsize::new(1)),
contexts: Vec::with_capacity(MAX_THREADS_PER_REPLICA),
buffer:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
inflight: RefCell::new([0; MAX_THREADS_PER_REPLICA]),
result:
RefCell::new(
Vec::with_capacity(
MAX_THREADS_PER_REPLICA
* Context::<
<D as Dispatch>::WriteOperation,
<D as Dispatch>::Response,
>::batch_size(),
),
),
slog: log.clone(),
data: CachePadded::new(RwLock::<D>::new(d)),
});
let mut replica = uninit_replica.assume_init();
for _idx in 0..MAX_THREADS_PER_REPLICA {
Arc::get_mut(&mut replica)
.unwrap()
.contexts
.push(Default::default());
}
replica
}
}
pub fn register(&self) -> Option<ReplicaToken> {
loop {
let idx = self.next.load(Ordering::SeqCst);
if idx > MAX_THREADS_PER_REPLICA {
return None;
};
if self
.next
.compare_exchange_weak(idx, idx + 1, Ordering::SeqCst, Ordering::SeqCst)
!= Ok(idx)
{
continue;
};
return Some(ReplicaToken(idx));
}
}
pub fn execute_mut(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
while !self.make_pending(op.clone(), idx.0) {}
self.try_combine(idx.0);
self.get_response(idx.0)
}
pub fn execute(
&self,
op: <D as Dispatch>::ReadOperation,
idx: ReplicaToken,
) -> <D as Dispatch>::Response {
self.read_only(op, idx.0)
}
fn get_response(&self, idx: usize) -> <D as Dispatch>::Response {
let mut iter = 0;
let interval = 1 << 29;
loop {
let r = self.contexts[idx - 1].res();
if let Some(resp) = r {
return resp;
}
iter += 1;
if iter == interval {
self.try_combine(idx);
iter = 0;
}
}
}
#[doc(hidden)]
pub fn verify<F: FnMut(&D)>(&self, mut v: F) {
while self.combiner.compare_exchange_weak(
0,
MAX_THREADS_PER_REPLICA + 2,
Ordering::Acquire,
Ordering::Acquire,
) != Ok(0)
{
spin_loop();
}
let mut data = self.data.write(self.next.load(Ordering::Relaxed));
let mut f = |o: <D as Dispatch>::WriteOperation, _i: usize| {
data.dispatch_mut(o);
};
self.slog.exec(self.idx, &mut f);
v(&data);
self.combiner.store(0, Ordering::Release);
}
pub fn sync(&self, idx: ReplicaToken) {
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(idx.0);
spin_loop();
}
}
fn read_only(
&self,
op: <D as Dispatch>::ReadOperation,
tid: usize,
) -> <D as Dispatch>::Response {
let ctail = self.slog.get_ctail();
while !self.slog.is_replica_synced_for_reads(self.idx, ctail) {
self.try_combine(tid);
spin_loop();
}
return self.data.read(tid - 1).dispatch(op);
}
#[inline(always)]
fn make_pending(&self, op: <D as Dispatch>::WriteOperation, idx: usize) -> bool {
self.contexts[idx - 1].enqueue(op)
}
fn try_combine(&self, tid: usize) {
for _i in 0..4 {
#[cfg(not(loom))]
if unsafe {
core::ptr::read_volatile(
&self.combiner
as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicUsize>
as *const usize,
)
} != 0
{
return;
}
#[cfg(loom)]
{
if self.combiner.load(Ordering::Relaxed) != 0 {
loom::thread::yield_now();
return;
}
}
}
if self
.combiner
.compare_exchange_weak(0, tid, Ordering::Acquire, Ordering::Acquire)
!= Ok(0)
{
#[cfg(loom)]
loom::thread::yield_now();
return;
}
self.combine();
self.combiner.store(0, Ordering::Release);
}
#[inline(always)]
fn combine(&self) {
let mut buffer = self.buffer.borrow_mut();
let mut operations = self.inflight.borrow_mut();
let mut results = self.result.borrow_mut();
buffer.clear();
results.clear();
let next = self.next.load(Ordering::Relaxed);
for i in 1..next {
operations[i - 1] = self.contexts[i - 1].ops(&mut buffer);
}
{
let mut data = self.data.write(next);
let f = |o: <D as Dispatch>::WriteOperation, i: usize| {
#[cfg(not(loom))]
let resp = data.dispatch_mut(o);
#[cfg(loom)]
let resp = data.dispatch_mut(o);
if i == self.idx {
results.push(resp);
}
};
self.slog.append(&buffer, self.idx, f);
}
{
let mut data = self.data.write(next);
let mut f = |o: <D as Dispatch>::WriteOperation, i: usize| {
let resp = data.dispatch_mut(o);
if i == self.idx {
results.push(resp)
};
};
self.slog.exec(self.idx, &mut f);
}
let (mut s, mut f) = (0, 0);
for i in 1..next {
if operations[i - 1] == 0 {
continue;
};
f += operations[i - 1];
self.contexts[i - 1].enqueue_resps(&results[s..f]);
s += operations[i - 1];
operations[i - 1] = 0;
}
}
}
#[cfg(test)]
mod test {
extern crate std;
use super::*;
use std::vec;
#[derive(Default)]
struct Data {
junk: u64,
}
impl Dispatch for Data {
type ReadOperation = u64;
type WriteOperation = u64;
type Response = Result<u64, ()>;
fn dispatch(&self, _op: Self::ReadOperation) -> Self::Response {
Ok(self.junk)
}
fn dispatch_mut(&mut self, _op: Self::WriteOperation) -> Self::Response {
self.junk += 1;
return Ok(107);
}
}
#[test]
fn test_replica_create() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.idx, 1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.next.load(Ordering::SeqCst), 1);
assert_eq!(repl.contexts.len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.buffer.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.inflight.borrow().len(), MAX_THREADS_PER_REPLICA);
assert_eq!(
repl.result.borrow().capacity(),
MAX_THREADS_PER_REPLICA * Context::<u64, Result<u64, ()>>::batch_size()
);
assert_eq!(repl.data.read(0).junk, 0);
}
#[test]
fn test_replica_register() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
assert_eq!(repl.register(), Some(ReplicaToken(1)));
assert_eq!(repl.next.load(Ordering::SeqCst), 2);
repl.next.store(17, Ordering::SeqCst);
assert_eq!(repl.register(), Some(ReplicaToken(17)));
assert_eq!(repl.next.load(Ordering::SeqCst), 18);
}
#[test]
fn test_replica_register_none() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next
.store(MAX_THREADS_PER_REPLICA + 1, Ordering::SeqCst);
assert!(repl.register().is_none());
}
#[test]
fn test_replica_make_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
let mut o = vec![];
assert!(repl.make_pending(121, 8));
assert_eq!(repl.contexts[7].ops(&mut o), 1);
assert_eq!(o.len(), 1);
assert_eq!(o[0], 121);
}
#[test]
fn test_replica_make_pending_false() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
for _i in 0..Context::<u64, Result<u64, ()>>::batch_size() {
assert!(repl.make_pending(121, 1))
}
assert!(!repl.make_pending(11, 1));
}
#[test]
fn test_replica_try_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.combiner.load(Ordering::SeqCst), 0);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[0].res(), Some(Ok(107)));
}
#[test]
fn test_replica_try_combine_pending() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.make_pending(121, 8);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 1);
assert_eq!(repl.contexts[7].res(), Some(Ok(107)));
}
#[test]
fn test_replica_try_combine_fail() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::new(1024));
let repl = Replica::<Data>::new(&slog);
repl.next.store(9, Ordering::SeqCst);
repl.combiner.store(8, Ordering::SeqCst);
repl.make_pending(121, 1);
repl.try_combine(1);
assert_eq!(repl.data.read(0).junk, 0);
assert_eq!(repl.contexts[0].res(), None);
}
#[test]
fn test_replica_execute_combine() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().unwrap();
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(1, repl.data.read(0).junk);
}
#[test]
fn test_replica_get_response() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let _idx = repl.register();
repl.make_pending(121, 1);
assert_eq!(repl.get_response(1), Ok(107));
}
#[test]
fn test_replica_execute() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let idx = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(107), repl.execute_mut(121, idx));
assert_eq!(Ok(1), repl.execute(11, idx));
}
#[test]
fn test_replica_execute_not_synced() {
let slog = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let repl = Replica::<Data>::new(&slog);
let o = [121, 212];
slog.append(&o, 2, |_o: u64, _i: usize| {});
slog.exec(2, &mut |_o: u64, _i: usize| {});
let t1 = repl.register().expect("Failed to register with replica.");
assert_eq!(Ok(2), repl.execute(11, t1));
}
}