use std::{collections::VecDeque, ops::{Deref, DerefMut}, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
use parking_lot::{RwLock, RwLockReadGuard};
use crate::utils::PanicHelper;
pub trait DeterministicState: Sized + Send + Sync + Clone + 'static {
type Action: Sized + Send + Sync + 'static;
type AuthorityAction: Sized + Send + Sync + 'static;
fn sequence(&self) -> u64;
fn authority(&self, action: Self::Action) -> Self::AuthorityAction;
fn update(&mut self, action: &Self::AuthorityAction);
}
pub struct SharedState<D: DeterministicState> {
inner: Arc<StateInner<D>>,
}
impl<D: DeterministicState + Clone> SharedState<D> {
pub fn new(state: D) -> (Self, SharedStateUpdater<D>) {
let sequence = state.sequence();
let inner = Arc::new(StateInner {
read_pos: AtomicUsize::new(0),
states: [
RwLock::new(state.clone()),
RwLock::new(state.clone()),
]
});
let shared = SharedState {
inner: inner.clone(),
};
let updater = StateUpdater {
inner,
queue: VecDeque::new(),
queue_offset: [0, 0],
queue_next_sequence: sequence,
};
(
shared,
SharedStateUpdater::Leader(LeaderUpdater {
state,
updater,
})
)
}
}
impl<D: DeterministicState> Clone for SharedState<D> {
fn clone(&self) -> Self {
SharedState { inner: self.inner.clone() }
}
}
impl<D: DeterministicState> SharedState<D> {
pub fn read(&self) -> RwLockReadGuard<D> {
self.inner.read()
}
}
impl<D: DeterministicState> StateInner<D> {
pub fn read(&self) -> RwLockReadGuard<D> {
for _ in 0..1048576 {
let pos = self.read_pos.load(Ordering::Acquire);
let idx = pos & 0x1;
if let Some(lock) = self.states[idx].try_read() {
return lock;
}
std::hint::spin_loop();
}
panic!("failed to acquire read lock");
}
}
pub enum SharedStateUpdater<D: DeterministicState> {
Leader(LeaderUpdater<D>),
Follower(FollowUpdater<D>),
}
impl<D: DeterministicState> SharedStateUpdater<D> {
pub fn shared(&self) -> SharedState<D> {
match self {
SharedStateUpdater::Leader(l) => l.updater.shared(),
SharedStateUpdater::Follower(f) => f.updater.shared(),
}
}
pub fn next_sequence(&self) -> u64 {
match self {
Self::Leader(l) => {
let v = l.updater.queue_next_sequence;
assert_eq!(v, l.state.sequence());
v
},
Self::Follower(f) => f.updater.queue_next_sequence,
}
}
pub fn update(&mut self) -> bool {
match self {
Self::Leader(l) => l.updater.update(),
Self::Follower(f) => f.updater.update(),
}
}
pub fn into_follow(self) -> FollowUpdater<D> {
match self {
Self::Follower(f) => f,
Self::Leader(l) => l.follow(),
}
}
pub fn into_lead(self) -> LeaderUpdater<D> {
match self {
Self::Leader(l) => l,
Self::Follower(f) => f.lead(),
}
}
pub fn as_leader(&mut self) -> Option<&mut LeaderUpdater<D>> {
match self {
Self::Leader(l) => Some(l),
_ => None,
}
}
pub fn as_follower(&mut self) -> Option<&mut LeaderUpdater<D>> {
match self {
Self::Leader(l) => Some(l),
_ => None,
}
}
}
pub struct LeaderUpdater<D: DeterministicState> {
updater: StateUpdater<D>,
state: D,
}
impl<D: DeterministicState> LeaderUpdater<D> {
pub fn queue(&mut self, action: D::Action) -> (u64, &D::AuthorityAction) {
let authority = self.state.authority(action);
let seq = self.state.sequence();
self.state.update(&authority);
self.updater.queue_sequenced(seq, authority).panic("invalid sequence")
}
pub fn update_ready(&mut self) -> bool {
self.updater.update_ready()
}
pub fn update(&mut self) {
self.updater.update();
}
pub fn flush(&mut self) {
self.updater.flush_queue();
}
pub fn state(&self) -> &D {
&self.state
}
pub fn update_state<R, F: FnOnce(&mut StatePtr<D>) -> R>(&mut self, update: F) -> R {
let mut ptr = StatePtr {
item: &mut self.state,
as_mut: false
};
let result = update(&mut ptr);
if ptr.as_mut {
self.updater.reset(self.state.clone());
}
result
}
pub fn next_sequence(&self) -> u64 {
self.updater.queue_next_sequence
}
pub fn follow(self) -> FollowUpdater<D> {
FollowUpdater { updater: self.updater }
}
}
pub struct StatePtr<'a, T> {
item: &'a mut T,
as_mut: bool,
}
impl<'a, T> AsRef<T> for StatePtr<'a, T> {
fn as_ref(&self) -> &T {
self.item
}
}
impl<'a, T> Deref for StatePtr<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.item
}
}
impl<'a, T> AsMut<T> for StatePtr<'a, T> {
fn as_mut(&mut self) -> &mut T {
self.as_mut = true;
self.item
}
}
impl<'a, T> DerefMut for StatePtr<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut = true;
self.item
}
}
pub struct FollowUpdater<D: DeterministicState> {
updater: StateUpdater<D>,
}
impl<D: DeterministicState> FollowUpdater<D> {
pub fn reset(&mut self, state: D) {
self.updater.reset(state);
}
pub fn queue(&mut self, sequence: u64, action: D::AuthorityAction) -> bool {
self.updater.queue_sequenced(sequence, action).is_ok()
}
pub fn update(&mut self) {
self.updater.update();
}
pub fn update_state<R, F: FnOnce(&mut StatePtr<D>) -> R>(&mut self, update: F) -> R {
let mut state = self.updater.read().clone();
let mut ptr = StatePtr {
item: &mut state,
as_mut: false
};
let result = update(&mut ptr);
if ptr.as_mut {
self.updater.reset(state);
}
result
}
pub fn flush(&mut self) {
self.updater.flush_queue();
}
pub fn read_state(&self) -> RwLockReadGuard<D> {
self.updater.inner.states[0].read()
}
pub fn next_sequence(&self) -> u64 {
self.updater.queue_next_sequence
}
pub fn lead(mut self) -> LeaderUpdater<D> {
self.updater.flush_queue();
let state = {
let state0 = self.updater.inner.states[0].read();
let state1 = self.updater.inner.states[1].read();
assert_eq!(state0.sequence(), state1.sequence());
state0.clone()
};
LeaderUpdater {
state,
updater: self.updater,
}
}
}
pub struct StateUpdater<D: DeterministicState> {
inner: Arc<StateInner<D>>,
queue: VecDeque<(u64, D::AuthorityAction)>,
queue_next_sequence: u64,
queue_offset: [usize; 2],
}
struct StateInner<D: DeterministicState> {
read_pos: AtomicUsize,
states: [RwLock<D>; 2],
}
impl<D: DeterministicState> StateUpdater<D> {
pub fn shared(&self) -> SharedState<D> {
SharedState { inner: self.inner.clone() }
}
pub fn reset(&mut self, state: D) {
let read_pos = self.inner.read_pos.load(Ordering::Relaxed);
let write_pos = read_pos.overflowing_add(1).0;
let write_idx = write_pos & 0x1;
let state_sequence = state.sequence();
{
let mut state_lock = self.inner.states[write_idx].write();
*state_lock = state.clone();
}
self.inner.read_pos.store(write_pos, Ordering::Release);
{
let mut state_lock = self.inner.states[read_pos & 0x1].write();
*state_lock = state;
}
self.queue.clear();
self.queue_offset = [0, 0];
self.queue_next_sequence = state_sequence;
}
pub fn queue_sequenced(&mut self, seq: u64, item: D::AuthorityAction) -> Result<(u64, &D::AuthorityAction), D::AuthorityAction> {
if self.queue_next_sequence != seq {
return Err(item);
}
Ok((seq, self.queue(item)))
}
pub fn queue(&mut self, item: D::AuthorityAction) -> &D::AuthorityAction {
self.queue.push_back((self.queue_next_sequence, item));
self.queue_next_sequence += 1;
&self.queue.back().unwrap().1
}
pub fn next_queued_sequence(&self) -> u64 {
self.queue_next_sequence
}
pub fn update_ready(&self) -> bool {
let read_pos = self.inner.read_pos.load(Ordering::Relaxed);
let write_pos = read_pos.overflowing_add(1).0;
self.inner.states[write_pos & 0x1].try_write().is_some()
}
pub fn flush_queue(&mut self) {
self.update();
self.update();
if self.update() {
panic!("expected update to be flushed after 2 .update() calls");
}
assert_eq!(self.queue.len(), 0);
}
pub fn read(&self) -> RwLockReadGuard<D> {
self.inner.read()
}
pub fn update(&mut self) -> bool {
let read_pos = self.inner.read_pos.load(Ordering::Relaxed);
let write_pos = read_pos.overflowing_add(1).0;
let write_idx = write_pos & 0x1;
let mut had_update = false;
{
let mut state = self.inner.states[write_idx].write();
let mut offset = self.queue_offset[write_idx];
let mut next_seq = state.sequence() + 1;
while offset < self.queue.len() {
let (target_seq, action) = &self.queue[offset];
assert_eq!(state.sequence(), *target_seq);
offset += 1;
had_update = true;
state.update(action);
assert_eq!(state.sequence(), next_seq, "state::update(action) did not increment sequence");
next_seq += 1;
}
self.queue_offset[write_idx] = offset;
}
self.trim_queue();
self.inner.read_pos.store(write_pos, Ordering::Release);
had_update
}
fn trim_queue(&mut self) {
let trim_size = self.queue_offset[0]
.min(self.queue_offset[1]);
for _ in 0..trim_size {
let _ = self.queue.pop_front();
}
self.queue_offset[0] -= trim_size;
self.queue_offset[1] -= trim_size;
}
}
#[cfg(test)]
mod test {
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
#[derive(Clone, Debug, Default)]
struct TestState {
sequence: u64,
time: u64,
numbers: Vec<u64>,
}
impl DeterministicState for TestState {
type Action = u64;
type AuthorityAction = (u64, u64);
fn authority(&self, action: Self::Action) -> Self::AuthorityAction {
(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64, action)
}
fn sequence(&self) -> u64 {
self.sequence
}
fn update(&mut self, action: &Self::AuthorityAction) {
self.sequence += 1;
self.time = action.0;
self.numbers.push(action.1);
}
}
}