use parking_lot::{Mutex, RwLock};
use crate::{transaction::with_tx, version::Version, Stm};
use std::{
any::Any,
marker::PhantomData,
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub type ID = u64;
type DynValue = Arc<dyn Any + Send + Sync>;
#[derive(Clone)]
pub struct VVar {
pub version: Version,
pub value: DynValue,
}
impl VVar {
pub fn downcast<T: Any + Sync + Send>(&self) -> Arc<T> {
match self.value.clone().downcast::<T>() {
Ok(s) => s,
Err(_) => unreachable!("TVar has wrong type"),
}
}
}
type Signaler = tokio::sync::mpsc::UnboundedSender<bool>;
pub struct WaitQueue {
pub last_written_version: Version,
waiting: Vec<Signaler>,
max_waiting: usize,
}
impl WaitQueue {
pub fn new() -> Self {
WaitQueue {
last_written_version: Default::default(),
waiting: Vec::new(),
max_waiting: 1,
}
}
pub fn add(&mut self, s: Signaler) {
self.prune();
self.waiting.push(s)
}
pub fn notify_all(&mut self, commit_version: Version) {
self.last_written_version = commit_version;
if self.waiting.is_empty() {
return;
}
let waiting = mem::take(&mut self.waiting);
for tx in waiting {
let _ = tx.send(true);
}
}
fn prune(&mut self) {
if self.waiting.len() > self.max_waiting {
self.waiting.retain(|tx| tx.send(false).is_ok());
self.max_waiting = self.max_waiting.max(self.waiting.len());
}
}
}
impl Default for WaitQueue {
fn default() -> Self {
Self::new()
}
}
pub struct SVar {
pub vvar: RwLock<VVar>,
pub queue: Mutex<WaitQueue>,
}
#[derive(Clone)]
pub struct LVar {
pub svar: Arc<SVar>,
pub vvar: VVar,
pub read: bool,
pub write: bool,
}
#[derive(Clone)]
pub struct TVar<T> {
pub(crate) id: ID,
pub(crate) svar: Arc<SVar>,
phantom: PhantomData<T>,
}
impl<T> Default for TVar<T>
where
T: Any + Sync + Send + Clone + Default,
{
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T: Any + Sync + Send + Clone> TVar<T> {
pub fn new(value: T) -> TVar<T> {
static COUNTER: AtomicU64 = AtomicU64::new(0);
TVar {
id: COUNTER.fetch_add(1, Ordering::Relaxed),
svar: Arc::new(SVar {
vvar: RwLock::new(VVar {
version: Default::default(),
value: Arc::new(value),
}),
queue: Mutex::new(WaitQueue::default()),
}),
phantom: PhantomData,
}
}
pub fn read_clone(&self) -> Stm<T> {
with_tx(|tx| tx.read(self).map(|r| r.as_ref().clone()))
}
pub fn read(&self) -> Stm<Arc<T>> {
with_tx(|tx| tx.read(self))
}
pub fn write(&self, value: T) -> Stm<()> {
with_tx(move |tx| tx.write(self, value))
}
pub fn update<F>(&self, f: F) -> Stm<()>
where
F: FnOnce(T) -> T,
{
let v = self.read_clone()?;
self.write(f(v))
}
pub fn update_mut<F>(&self, f: F) -> Stm<()>
where
F: FnOnce(&mut T),
{
let mut v = self.read_clone()?;
f(&mut v);
self.write(v)
}
pub fn modify<F, R>(&self, f: F) -> Stm<R>
where
F: FnOnce(T) -> (T, R),
{
let v = self.read_clone()?;
let (w, r) = f(v);
self.write(w)?;
Ok(r)
}
pub fn modify_mut<F, R>(&self, f: F) -> Stm<R>
where
F: FnOnce(&mut T) -> R,
{
let mut v = self.read_clone()?;
let r = f(&mut v);
self.write(v)?;
Ok(r)
}
pub fn replace(&self, value: T) -> Stm<Arc<T>> {
let v = self.read()?;
self.write(value)?;
Ok(v)
}
}