use std::collections::HashMap;
use crate::runtime::l0::OccReadSet;
use crate::runtime::occ::{CommitRegistry, WriteSet};
use crate::runtime::sync::{self, Arc, AtomicU64, Mutex, Ordering};
use uni_common::core::id::Vid;
const REGISTRY_CAPACITY: usize = 16;
struct VersionedStore {
versions: HashMap<Vid, Vec<(u64, i64)>>,
}
impl VersionedStore {
fn new(initial: &[(Vid, i64)]) -> Self {
let versions = initial
.iter()
.map(|&(vid, val)| (vid, vec![(0u64, val)]))
.collect();
Self { versions }
}
fn read_at(&self, read_seq: u64, vid: Vid) -> i64 {
self.versions
.get(&vid)
.and_then(|vs| vs.iter().rev().find(|(s, _)| *s <= read_seq))
.map(|(_, v)| *v)
.unwrap_or(0)
}
fn apply(&mut self, seq: u64, vid: Vid, val: i64) {
self.versions.entry(vid).or_default().push((seq, val));
}
fn latest(&self, vid: Vid) -> i64 {
self.versions
.get(&vid)
.and_then(|vs| vs.last())
.map(|(_, v)| *v)
.unwrap_or(0)
}
}
#[derive(Clone)]
struct Shared {
seq: Arc<AtomicU64>,
registry: Arc<Mutex<CommitRegistry>>,
store: Arc<Mutex<VersionedStore>>,
}
impl Shared {
fn new(initial: &[(Vid, i64)]) -> Self {
Self::with_capacity(initial, REGISTRY_CAPACITY)
}
fn with_capacity(initial: &[(Vid, i64)], capacity: usize) -> Self {
Self {
seq: Arc::new(AtomicU64::new(0)),
registry: Arc::new(Mutex::new(CommitRegistry::new(capacity))),
store: Arc::new(Mutex::new(VersionedStore::new(initial))),
}
}
fn snapshot(&self, vids: &[Vid]) -> (u64, Vec<i64>) {
let st = self.store.lock().unwrap();
let read_seq = self.seq.load(Ordering::Relaxed);
let vals = vids.iter().map(|&v| st.read_at(read_seq, v)).collect();
(read_seq, vals)
}
fn try_commit(
&self,
read_seq: u64,
ws: WriteSet,
rs: Option<&OccReadSet>,
writes: &[(Vid, i64)],
) -> bool {
let mut reg = self.registry.lock().unwrap();
if reg.check(read_seq, &ws, rs).is_some() {
return false;
}
let mut st = self.store.lock().unwrap();
let next = reg.commit(&self.seq, ws);
for &(vid, val) in writes {
st.apply(next, vid, val);
}
true
}
}
pub fn run_counter_model(n: usize) {
sync::check(move || {
let c = Vid::new(1);
let shared = Shared::new(&[(c, 0)]);
let handles: Vec<_> = (0..n)
.map(|_| {
let shared = shared.clone();
sync::thread::spawn(move || {
let (read_seq, vals) = shared.snapshot(&[c]);
let mut ws = WriteSet::default();
ws.vertices.insert(c);
shared.try_commit(read_seq, ws, None, &[(c, vals[0] + 1)])
})
})
.collect();
let committed: i64 = handles
.into_iter()
.map(|h| i64::from(h.join().unwrap()))
.sum();
let final_c = shared.store.lock().unwrap().latest(c);
assert_eq!(
final_c, committed,
"lost update: final counter {final_c} != {committed} committed increments",
);
});
}
pub fn run_bank_model() {
sync::check(|| {
let x = Vid::new(1);
let y = Vid::new(2);
let shared = Shared::new(&[(x, 1), (y, 1)]);
let handles: Vec<_> = [x, y]
.into_iter()
.map(|mine| {
let shared = shared.clone();
sync::thread::spawn(move || {
let (read_seq, vals) = shared.snapshot(&[x, y]);
let other_val = if mine == x { vals[1] } else { vals[0] };
if other_val < 1 {
return; }
let mut ws = WriteSet::default();
ws.vertices.insert(mine);
let mut rs = OccReadSet::default();
rs.vertices.insert(x);
rs.vertices.insert(y);
shared.try_commit(read_seq, ws, Some(&rs), &[(mine, 0)]);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let st = shared.store.lock().unwrap();
let (fx, fy) = (st.latest(x), st.latest(y));
assert!(
fx + fy >= 1 && fx >= 0 && fy >= 0,
"write skew: constraint x + y >= 1 violated (x = {fx}, y = {fy})",
);
});
}
pub fn run_truncation_model() {
sync::check(|| {
let h = Vid::new(1); let f = Vid::new(2); let shared = Shared::with_capacity(&[(h, 0), (f, 0)], 1);
let mut handles = Vec::new();
for _ in 0..2 {
let shared = shared.clone();
handles.push(sync::thread::spawn(move || {
let (read_seq, vals) = shared.snapshot(&[h]);
let mut ws = WriteSet::default();
ws.vertices.insert(h);
shared.try_commit(read_seq, ws, None, &[(h, vals[0] + 1)])
}));
}
{
let shared = shared.clone();
handles.push(sync::thread::spawn(move || {
let (read_seq, vals) = shared.snapshot(&[f]);
let mut ws = WriteSet::default();
ws.vertices.insert(f);
shared.try_commit(read_seq, ws, None, &[(f, vals[0] + 1)])
}));
}
let committed: Vec<bool> = handles.into_iter().map(|j| j.join().unwrap()).collect();
let committed_h = i64::from(committed[0]) + i64::from(committed[1]);
let final_h = shared.store.lock().unwrap().latest(h);
assert_eq!(
final_h, committed_h,
"lost update on h under truncation: final {final_h} != {committed_h} committed h-writes",
);
});
}