use crate::rt::location::Location;
use crate::rt::object;
use crate::rt::{
self, thread, Access, Numeric, Synchronize, VersionVec, MAX_ATOMIC_HISTORY, MAX_THREADS,
};
use std::cmp;
use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use std::u16;
#[derive(Debug)]
pub(crate) struct Atomic<T> {
state: object::Ref<State>,
_p: PhantomData<fn() -> T>,
}
#[derive(Debug)]
pub(super) struct State {
created_location: Location,
loaded_at: VersionVec,
unsync_loaded_at: VersionVec,
stored_at: VersionVec,
unsync_mut_at: VersionVec,
is_mutating: bool,
last_access: Option<Access>,
last_non_load_access: Option<Access>,
stores: [Store; MAX_ATOMIC_HISTORY],
cnt: u16,
}
#[derive(Debug, Copy, Clone)]
pub(super) enum Action {
Load,
Store,
Rmw,
}
#[derive(Debug)]
struct Store {
value: u64,
happens_before: VersionVec,
modification_order: VersionVec,
sync: Synchronize,
first_seen: FirstSeen,
seq_cst: bool,
}
#[derive(Debug)]
struct FirstSeen([u16; MAX_THREADS]);
pub(crate) fn fence(ordering: Ordering) {
use std::sync::atomic::Ordering::Acquire;
assert_eq!(
ordering, Acquire,
"only Acquire fences are currently supported"
);
rt::synchronize(|execution| {
for state in execution.objects.iter_mut::<State>() {
for store in state.stores_mut() {
if !store.first_seen.is_seen_by_current(&execution.threads) {
continue;
}
store.sync.sync_load(&mut execution.threads, ordering);
}
}
});
}
impl<T: Numeric> Atomic<T> {
pub(crate) fn new(value: T, location: Location) -> Atomic<T> {
rt::execution(|execution| {
let state = State::new(&mut execution.threads, value.into_u64(), location);
let state = execution.objects.insert(state);
Atomic {
state,
_p: PhantomData,
}
})
}
pub(crate) fn load(&self, ordering: Ordering) -> T {
self.branch(Action::Load);
super::synchronize(|execution| {
let state = self.state.get_mut(&mut execution.objects);
if execution.path.is_traversed() {
let mut seed = [0; MAX_ATOMIC_HISTORY];
let n = state.match_load_to_stores(&execution.threads, &mut seed[..], ordering);
execution.path.push_load(&seed[..n]);
}
let index = execution.path.branch_load();
T::from_u64(state.load(&mut execution.threads, index, ordering))
})
}
pub(crate) fn unsync_load(&self) -> T {
rt::execution(|execution| {
let state = self.state.get_mut(&mut execution.objects);
state.track_unsync_load(&execution.threads);
let index = index(state.cnt - 1);
T::from_u64(state.stores[index].value)
})
}
pub(crate) fn store(&self, val: T, ordering: Ordering) {
self.branch(Action::Store);
super::synchronize(|execution| {
let state = self.state.get_mut(&mut execution.objects);
state.track_store(&execution.threads);
state.store(
&mut execution.threads,
Synchronize::new(),
val.into_u64(),
ordering,
);
})
}
pub(crate) fn rmw<F, E>(&self, success: Ordering, failure: Ordering, f: F) -> Result<T, E>
where
F: FnOnce(T) -> Result<T, E>,
{
self.branch(Action::Rmw);
super::synchronize(|execution| {
let state = self.state.get_mut(&mut execution.objects);
if execution.path.is_traversed() {
let mut seed = [0; MAX_ATOMIC_HISTORY];
let n = state.match_rmw_to_stores(&mut seed[..]);
execution.path.push_load(&seed[..n]);
}
let index = execution.path.branch_load();
state
.rmw(&mut execution.threads, index, success, failure, |num| {
f(T::from_u64(num)).map(T::into_u64)
})
.map(T::from_u64)
})
}
pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
let value = super::execution(|execution| {
let state = self.state.get_mut(&mut execution.objects);
state.track_unsync_mut(&execution.threads);
state.is_mutating = true;
let index = index(state.cnt - 1);
T::from_u64(state.stores[index].value)
});
struct Reset<T: Numeric>(T, object::Ref<State>);
impl<T: Numeric> Drop for Reset<T> {
fn drop(&mut self) {
super::execution(|execution| {
let state = self.1.get_mut(&mut execution.objects);
assert!(state.is_mutating);
state.is_mutating = false;
let index = index(state.cnt - 1);
state.stores[index].value = T::into_u64(self.0);
if !std::thread::panicking() {
state.track_unsync_mut(&execution.threads);
}
});
}
}
let mut reset = Reset(value, self.state);
f(&mut reset.0)
}
fn branch(&self, action: Action) {
let r = self.state;
r.branch_action(action);
assert!(
r.ref_eq(self.state),
"Internal state mutated during branch. This is \
usually due to a bug in the algorithm being tested writing in \
an invalid memory location."
);
}
}
impl State {
fn new(threads: &mut thread::Set, value: u64, location: Location) -> State {
let mut state = State {
created_location: location,
loaded_at: VersionVec::new(),
unsync_loaded_at: VersionVec::new(),
stored_at: VersionVec::new(),
unsync_mut_at: VersionVec::new(),
is_mutating: false,
last_access: None,
last_non_load_access: None,
stores: Default::default(),
cnt: 0,
};
state.track_unsync_mut(threads);
state.store(threads, Synchronize::new(), value, Ordering::Release);
state
}
fn load(&mut self, threads: &mut thread::Set, index: usize, ordering: Ordering) -> u64 {
self.track_load(threads);
self.apply_load_coherence(threads, index);
let store = &mut self.stores[index];
store.first_seen.touch(threads);
store.sync.sync_load(threads, ordering);
store.value
}
fn store(
&mut self,
threads: &mut thread::Set,
mut sync: Synchronize,
value: u64,
ordering: Ordering,
) {
let index = index(self.cnt);
self.cnt += 1;
let happens_before = threads.active().causality.clone();
let mut modification_order = happens_before;
for i in 0..self.stores.len() {
if self.stores[i].first_seen.is_seen_by_current(threads) {
let mo = self.stores[i].modification_order;
modification_order.join(&mo);
}
}
sync.sync_store(threads, ordering);
let mut first_seen = FirstSeen::new();
first_seen.touch(threads);
self.stores[index] = Store {
value,
happens_before,
modification_order,
sync,
first_seen,
seq_cst: is_seq_cst(ordering),
};
}
fn rmw<E>(
&mut self,
threads: &mut thread::Set,
index: usize,
success: Ordering,
failure: Ordering,
f: impl FnOnce(u64) -> Result<u64, E>,
) -> Result<u64, E> {
self.track_load(threads);
self.apply_load_coherence(threads, index);
self.stores[index].first_seen.touch(threads);
let prev = self.stores[index].value;
match f(prev) {
Ok(next) => {
self.track_store(threads);
self.stores[index].sync.sync_load(threads, success);
let sync = self.stores[index].sync;
self.store(threads, sync, next, success);
Ok(prev)
}
Err(e) => {
self.stores[index].sync.sync_load(threads, failure);
Err(e)
}
}
}
fn apply_load_coherence(&mut self, threads: &mut thread::Set, index: usize) {
for i in 0..self.stores.len() {
if index == i {
continue;
}
if self.stores[i].first_seen.is_seen_by_current(threads) {
let mo = self.stores[i].modification_order;
self.stores[index].modification_order.join(&mo);
}
if self.stores[i].happens_before < threads.active().causality {
let mo = self.stores[i].modification_order;
self.stores[index].modification_order.join(&mo);
}
}
}
fn track_load(&mut self, threads: &thread::Set) {
assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
let current = &threads.active().causality;
assert!(
self.unsync_mut_at <= *current,
"Causality violation: \
Concurrent load and mut accesses"
);
self.loaded_at.join(current);
}
fn track_unsync_load(&mut self, threads: &thread::Set) {
assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
let current = &threads.active().causality;
assert!(
self.unsync_mut_at <= *current,
"Causality violation: \
Concurrent `unsync_load` and mut accesses"
);
assert!(
self.stored_at <= *current,
"Causality violation: \
Concurrent `unsync_load` and atomic store"
);
self.unsync_loaded_at.join(current);
}
fn track_store(&mut self, threads: &thread::Set) {
assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
let current = &threads.active().causality;
assert!(
self.unsync_mut_at <= *current,
"Causality violation: \
Concurrent atomic store and mut accesses"
);
assert!(
self.unsync_loaded_at <= *current,
"Causality violation: \
Concurrent `unsync_load` and atomic store"
);
self.stored_at.join(current);
}
fn track_unsync_mut(&mut self, threads: &thread::Set) {
assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
let current = &threads.active().causality;
assert!(
self.loaded_at <= *current,
"Causality violation: \
Concurrent atomic load and unsync mut accesses"
);
assert!(
self.unsync_loaded_at <= *current,
"Causality violation: \
Concurrent `unsync_load` and unsync mut accesses"
);
assert!(
self.stored_at <= *current,
"Causality violation: \
Concurrent atomic store and unsync mut accesses"
);
assert!(
self.unsync_mut_at <= *current,
"Causality violation: \
Concurrent unsync mut accesses"
);
self.unsync_mut_at.join(current);
}
fn match_load_to_stores(
&self,
threads: &thread::Set,
dst: &mut [u8],
ordering: Ordering,
) -> usize {
let mut n = 0;
let cnt = self.cnt as usize;
'outer: for i in 0..self.stores.len() {
let store_i = &self.stores[i];
if i >= cnt {
continue;
}
for j in 0..self.stores.len() {
let store_j = &self.stores[j];
if i == j || j >= cnt {
continue;
}
let mo_i = store_i.modification_order;
let mo_j = store_j.modification_order;
assert_ne!(mo_i, mo_j);
if mo_i < mo_j {
if store_j.first_seen.is_seen_by_current(threads) {
continue 'outer;
}
if store_i.first_seen.is_seen_before_yield(threads) {
continue 'outer;
}
if is_seq_cst(ordering) && store_i.seq_cst && store_j.seq_cst {
continue 'outer;
}
}
}
dst[n] = i as u8;
n += 1;
}
n
}
fn match_rmw_to_stores(&self, dst: &mut [u8]) -> usize {
let mut n = 0;
let cnt = self.cnt as usize;
'outer: for i in 0..self.stores.len() {
let store_i = &self.stores[i];
if i >= cnt {
continue;
}
for j in 0..self.stores.len() {
let store_j = &self.stores[j];
if i == j || j >= cnt {
continue;
}
let mo_i = store_i.modification_order;
let mo_j = store_j.modification_order;
assert_ne!(mo_i, mo_j);
if mo_i < mo_j {
continue 'outer;
}
}
dst[n] = i as u8;
n += 1;
}
n
}
fn stores_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut Store> {
let (start, end) = range(self.cnt);
let (two, one) = self.stores[..end].split_at_mut(start);
one.iter_mut().chain(two.iter_mut())
}
pub(super) fn last_dependent_access(&self, action: Action) -> Option<&Access> {
match action {
Action::Load => self.last_non_load_access.as_ref(),
_ => self.last_access.as_ref(),
}
}
pub(super) fn set_last_access(&mut self, action: Action, path_id: usize, version: &VersionVec) {
Access::set_or_create(&mut self.last_access, path_id, version);
match action {
Action::Load => {}
_ => {
Access::set_or_create(&mut self.last_non_load_access, path_id, version);
}
}
}
}
impl Default for Store {
fn default() -> Store {
Store {
value: 0,
happens_before: VersionVec::new(),
modification_order: VersionVec::new(),
sync: Synchronize::new(),
first_seen: FirstSeen::new(),
seq_cst: false,
}
}
}
impl FirstSeen {
fn new() -> FirstSeen {
FirstSeen([u16::max_value(); MAX_THREADS])
}
fn touch(&mut self, threads: &thread::Set) {
if self.0[threads.active_id().as_usize()] == u16::max_value() {
self.0[threads.active_id().as_usize()] = threads.active_atomic_version();
}
}
fn is_seen_by_current(&self, threads: &thread::Set) -> bool {
for (thread_id, version) in threads.active().causality.versions(threads.execution_id()) {
match self.0[thread_id.as_usize()] {
u16::MAX => {}
v if v <= version => return true,
_ => {}
}
}
false
}
fn is_seen_before_yield(&self, threads: &thread::Set) -> bool {
let thread_id = threads.active_id();
let last_yield = match threads.active().last_yield {
Some(v) => v,
None => return false,
};
match self.0[thread_id.as_usize()] {
u16::MAX => false,
v => v <= last_yield,
}
}
}
fn is_seq_cst(order: Ordering) -> bool {
match order {
Ordering::SeqCst => true,
_ => false,
}
}
fn range(cnt: u16) -> (usize, usize) {
let start = index(cnt.saturating_sub(MAX_ATOMIC_HISTORY as u16));
let mut end = index(cmp::min(cnt, MAX_ATOMIC_HISTORY as u16));
if end == 0 {
end = MAX_ATOMIC_HISTORY;
}
assert!(
start <= end,
"[loom internal bug] cnt = {}; start = {}; end = {}",
cnt,
start,
end
);
(start, end)
}
fn index(cnt: u16) -> usize {
cnt as usize % MAX_ATOMIC_HISTORY as usize
}