#![doc(test(attr(deny(warnings))))]
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(deprecated)]
pub mod access;
mod as_raw;
pub mod cache;
mod compile_fail_tests;
mod debt;
pub mod docs;
mod ref_cnt;
#[cfg(feature = "serde")]
mod serde;
pub mod strategy;
#[cfg(feature = "weak")]
mod weak;
use std::borrow::Borrow;
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use crate::access::{Access, Map};
pub use crate::as_raw::AsRaw;
pub use crate::cache::Cache;
pub use crate::ref_cnt::RefCnt;
use crate::strategy::hybrid::{DefaultConfig, HybridStrategy};
use crate::strategy::sealed::Protected;
use crate::strategy::{CaS, Strategy};
pub use crate::strategy::{DefaultStrategy, IndependentStrategy};
pub struct Guard<T: RefCnt, S: Strategy<T> = DefaultStrategy> {
inner: S::Protected,
}
impl<T: RefCnt, S: Strategy<T>> Guard<T, S> {
#[allow(clippy::wrong_self_convention)]
#[inline]
pub fn into_inner(lease: Self) -> T {
lease.inner.into_inner()
}
pub fn from_inner(inner: T) -> Self {
Guard {
inner: S::Protected::from_inner(inner),
}
}
}
impl<T: RefCnt, S: Strategy<T>> Deref for Guard<T, S> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
self.inner.borrow()
}
}
impl<T: RefCnt, S: Strategy<T>> From<T> for Guard<T, S> {
fn from(inner: T) -> Self {
Self::from_inner(inner)
}
}
impl<T: Default + RefCnt, S: Strategy<T>> Default for Guard<T, S> {
fn default() -> Self {
Self::from(T::default())
}
}
impl<T: Debug + RefCnt, S: Strategy<T>> Debug for Guard<T, S> {
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
self.deref().fmt(formatter)
}
}
impl<T: Display + RefCnt, S: Strategy<T>> Display for Guard<T, S> {
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
self.deref().fmt(formatter)
}
}
#[allow(clippy::needless_pass_by_value)]
fn ptr_eq<Base, A, B>(a: A, b: B) -> bool
where
A: AsRaw<Base>,
B: AsRaw<Base>,
{
let a = a.as_raw();
let b = b.as_raw();
ptr::eq(a, b)
}
pub struct ArcSwapAny<T: RefCnt, S: Strategy<T> = DefaultStrategy> {
ptr: AtomicPtr<T::Base>,
_phantom_arc: PhantomData<T>,
strategy: S,
}
impl<T: RefCnt, S: Default + Strategy<T>> From<T> for ArcSwapAny<T, S> {
fn from(val: T) -> Self {
Self::with_strategy(val, S::default())
}
}
impl<T: RefCnt, S: Strategy<T>> Drop for ArcSwapAny<T, S> {
fn drop(&mut self) {
let ptr = *self.ptr.get_mut();
unsafe {
self.strategy.wait_for_readers(ptr, &self.ptr);
T::dec(ptr);
}
}
}
impl<T, S: Strategy<T>> Debug for ArcSwapAny<T, S>
where
T: Debug + RefCnt,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
formatter
.debug_tuple("ArcSwapAny")
.field(&self.load())
.finish()
}
}
impl<T, S: Strategy<T>> Display for ArcSwapAny<T, S>
where
T: Display + RefCnt,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
self.load().fmt(formatter)
}
}
impl<T: RefCnt + Default, S: Default + Strategy<T>> Default for ArcSwapAny<T, S> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T: RefCnt, S: Strategy<T>> ArcSwapAny<T, S> {
pub fn new(val: T) -> Self
where
S: Default,
{
Self::from(val)
}
pub fn with_strategy(val: T, strategy: S) -> Self {
let ptr = T::into_ptr(val);
Self {
ptr: AtomicPtr::new(ptr),
_phantom_arc: PhantomData,
strategy,
}
}
pub fn into_inner(mut self) -> T {
let ptr = *self.ptr.get_mut();
unsafe { self.strategy.wait_for_readers(ptr, &self.ptr) };
mem::forget(self);
unsafe { T::from_ptr(ptr) }
}
pub fn load_full(&self) -> T {
Guard::into_inner(self.load())
}
#[inline]
pub fn load(&self) -> Guard<T, S> {
let protected = unsafe { self.strategy.load(&self.ptr) };
Guard { inner: protected }
}
pub fn store(&self, val: T) {
drop(self.swap(val));
}
pub fn swap(&self, new: T) -> T {
let new = T::into_ptr(new);
let old = self.ptr.swap(new, Ordering::SeqCst);
unsafe {
self.strategy.wait_for_readers(old, &self.ptr);
T::from_ptr(old)
}
}
pub fn compare_and_swap<C>(&self, current: C, new: T) -> Guard<T, S>
where
C: AsRaw<T::Base>,
S: CaS<T>,
{
let protected = unsafe { self.strategy.compare_and_swap(&self.ptr, current, new) };
Guard { inner: protected }
}
pub fn rcu<R, F>(&self, mut f: F) -> T
where
F: FnMut(&T) -> R,
R: Into<T>,
S: CaS<T>,
{
let mut cur = self.load();
loop {
let new = f(&cur).into();
let prev = self.compare_and_swap(&*cur, new);
let swapped = ptr_eq(&*cur, &*prev);
if swapped {
return Guard::into_inner(prev);
} else {
cur = prev;
}
}
}
pub fn map<I, R, F>(&self, f: F) -> Map<&Self, I, F>
where
F: Fn(&I) -> &R + Clone,
Self: Access<I>,
{
Map::new(self, f)
}
}
pub type ArcSwap<T> = ArcSwapAny<Arc<T>>;
impl<T, S: Strategy<Arc<T>>> ArcSwapAny<Arc<T>, S> {
pub fn from_pointee(val: T) -> Self
where
S: Default,
{
Self::from(Arc::new(val))
}
}
pub type ArcSwapOption<T> = ArcSwapAny<Option<Arc<T>>>;
impl<T, S: Strategy<Option<Arc<T>>>> ArcSwapAny<Option<Arc<T>>, S> {
pub fn from_pointee<V: Into<Option<T>>>(val: V) -> Self
where
S: Default,
{
Self::new(val.into().map(Arc::new))
}
pub fn empty() -> Self
where
S: Default,
{
Self::new(None)
}
}
impl<T> ArcSwapOption<T> {
pub const fn const_empty() -> Self {
Self {
ptr: AtomicPtr::new(ptr::null_mut()),
_phantom_arc: PhantomData,
strategy: HybridStrategy {
_config: DefaultConfig,
},
}
}
}
#[doc(hidden)]
pub type IndependentArcSwap<T> = ArcSwapAny<Arc<T>, IndependentStrategy>;
#[cfg(feature = "weak")]
pub type ArcSwapWeak<T> = ArcSwapAny<std::sync::Weak<T>>;
macro_rules! t {
($name: ident, $strategy: ty) => {
#[cfg(test)]
mod $name {
use std::panic;
use std::sync::atomic::{self, AtomicUsize};
use adaptive_barrier::{Barrier, PanicMode};
use crossbeam_utils::thread;
use super::*;
const ITERATIONS: usize = 10;
#[allow(deprecated)] type As<T> = ArcSwapAny<Arc<T>, $strategy>;
#[allow(deprecated)] type Aso<T> = ArcSwapAny<Option<Arc<T>>, $strategy>;
#[test]
#[cfg_attr(miri, ignore)] fn publish() {
const READERS: usize = 2;
for _ in 0..ITERATIONS {
let config = As::<String>::default();
let ended = AtomicUsize::new(0);
thread::scope(|scope| {
for _ in 0..READERS {
scope.spawn(|_| loop {
let cfg = config.load_full();
if !cfg.is_empty() {
assert_eq!(*cfg, "New configuration");
ended.fetch_add(1, Ordering::Relaxed);
return;
}
atomic::spin_loop_hint();
});
}
scope.spawn(|_| {
let new_conf = Arc::new("New configuration".to_owned());
config.store(new_conf);
});
})
.unwrap();
assert_eq!(READERS, ended.load(Ordering::Relaxed));
let arc = config.load_full();
assert_eq!(2, Arc::strong_count(&arc));
assert_eq!(0, Arc::weak_count(&arc));
}
}
#[test]
fn swap_load() {
for _ in 0..100 {
let arc = Arc::new(42);
let arc_swap = As::from(Arc::clone(&arc));
assert_eq!(42, **arc_swap.load());
assert_eq!(42, **arc_swap.load());
let new_arc = Arc::new(0);
assert_eq!(42, *arc_swap.swap(Arc::clone(&new_arc)));
assert_eq!(0, **arc_swap.load());
let loaded = arc_swap.load_full();
assert_eq!(3, Arc::strong_count(&loaded));
assert_eq!(0, Arc::weak_count(&loaded));
assert_eq!(1, Arc::strong_count(&arc));
assert_eq!(0, Arc::weak_count(&arc));
}
}
#[test]
fn multi_writers() {
let first_value = Arc::new((0, 0));
let shared = As::from(Arc::clone(&first_value));
const WRITER_CNT: usize = 2;
const READER_CNT: usize = 3;
#[cfg(miri)]
const ITERATIONS: usize = 5;
#[cfg(not(miri))]
const ITERATIONS: usize = 100;
const SEQ: usize = 50;
let barrier = Barrier::new(PanicMode::Poison);
thread::scope(|scope| {
for w in 0..WRITER_CNT {
let mut barrier = barrier.clone();
let shared = &shared;
let first_value = &first_value;
scope.spawn(move |_| {
for _ in 0..ITERATIONS {
barrier.wait();
shared.store(Arc::clone(&first_value));
barrier.wait();
for i in 0..SEQ {
shared.store(Arc::new((w, i + 1)));
}
}
});
}
for _ in 0..READER_CNT {
let mut barrier = barrier.clone();
let shared = &shared;
let first_value = &first_value;
scope.spawn(move |_| {
for _ in 0..ITERATIONS {
barrier.wait();
barrier.wait();
let mut previous = [0; WRITER_CNT];
let mut last = Arc::clone(&first_value);
loop {
let cur = shared.load();
if Arc::ptr_eq(&last, &cur) {
atomic::spin_loop_hint();
continue;
}
let (w, s) = **cur;
assert!(previous[w] < s, "{:?} vs {:?}", previous, cur);
previous[w] = s;
last = Guard::into_inner(cur);
if s == SEQ {
break;
}
}
}
});
}
drop(barrier);
})
.unwrap();
}
#[test]
fn load_null() {
let shared = Aso::<usize>::default();
let guard = shared.load();
assert!(guard.is_none());
shared.store(Some(Arc::new(42)));
assert_eq!(42, **shared.load().as_ref().unwrap());
}
#[test]
fn from_into() {
let a = Arc::new(42);
let shared = As::new(a);
let guard = shared.load();
let a = shared.into_inner();
assert_eq!(42, *a);
assert_eq!(2, Arc::strong_count(&a));
drop(guard);
assert_eq!(1, Arc::strong_count(&a));
}
#[derive(Default)]
struct ReportDrop(Arc<AtomicUsize>);
impl Drop for ReportDrop {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn guard_drop_in_thread() {
for _ in 0..ITERATIONS {
let cnt = Arc::new(AtomicUsize::new(0));
let shared = As::from_pointee(ReportDrop(cnt.clone()));
assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely");
let sync = Barrier::new(PanicMode::Poison);
thread::scope(|scope| {
scope.spawn({
let sync = sync.clone();
|_| {
let mut sync = sync; let guard = shared.load();
sync.wait();
sync.wait();
drop(guard);
assert_eq!(cnt.load(Ordering::Relaxed), 1, "Value not dropped");
sync.wait();
}
});
scope.spawn(|_| {
let mut sync = sync;
sync.wait();
shared.store(Default::default());
assert_eq!(
cnt.load(Ordering::Relaxed),
0,
"Dropped while still in use"
);
sync.wait();
sync.wait();
assert_eq!(cnt.load(Ordering::Relaxed), 1, "Value not dropped");
});
})
.unwrap();
}
}
#[test]
fn guard_drop_in_another_thread() {
for _ in 0..ITERATIONS {
let cnt = Arc::new(AtomicUsize::new(0));
let shared = As::from_pointee(ReportDrop(cnt.clone()));
assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely");
let guard = shared.load();
drop(shared);
assert_eq!(cnt.load(Ordering::Relaxed), 0, "Dropped prematurely");
thread::scope(|scope| {
scope.spawn(|_| {
drop(guard);
});
})
.unwrap();
assert_eq!(cnt.load(Ordering::Relaxed), 1, "Not dropped");
}
}
#[test]
fn load_option() {
let shared = Aso::from_pointee(42);
let opt: Option<_> = Guard::into_inner(shared.load());
assert_eq!(42, *opt.unwrap());
shared.store(None);
assert!(shared.load().is_none());
}
#[test]
fn debug_impl() {
let shared = As::from_pointee(42);
assert_eq!("ArcSwapAny(42)", &format!("{:?}", shared));
assert_eq!("42", &format!("{:?}", shared.load()));
}
#[test]
fn display_impl() {
let shared = As::from_pointee(42);
assert_eq!("42", &format!("{}", shared));
assert_eq!("42", &format!("{}", shared.load()));
}
fn _check_stuff_is_send_sync() {
let shared = As::from_pointee(42);
let moved = As::from_pointee(42);
let shared_ref = &shared;
let lease = shared.load();
let lease_ref = &lease;
let lease = shared.load();
thread::scope(|s| {
s.spawn(move |_| {
let _ = lease;
let _ = lease_ref;
let _ = shared_ref;
let _ = moved;
});
})
.unwrap();
}
#[test]
fn recursive() {
let shared = ArcSwap::from(Arc::new(0));
shared.rcu(|i| {
if **i < 10 {
shared.rcu(|i| **i + 1);
}
**i
});
assert_eq!(10, **shared.load());
assert_eq!(2, Arc::strong_count(&shared.load_full()));
}
#[test]
fn rcu_panic() {
let shared = ArcSwap::from(Arc::new(0));
assert!(panic::catch_unwind(|| shared.rcu(|_| -> usize { panic!() })).is_err());
assert_eq!(1, Arc::strong_count(&shared.swap(Arc::new(42))));
}
#[test]
fn nulls() {
let shared = ArcSwapOption::from(Some(Arc::new(0)));
let orig = shared.swap(None);
assert_eq!(1, Arc::strong_count(&orig.unwrap()));
let null = shared.load();
assert!(null.is_none());
let a = Arc::new(42);
let orig = shared.compare_and_swap(ptr::null(), Some(Arc::clone(&a)));
assert!(orig.is_none());
assert_eq!(2, Arc::strong_count(&a));
let orig = Guard::into_inner(shared.compare_and_swap(&None::<Arc<_>>, None));
assert_eq!(3, Arc::strong_count(&a));
assert!(ptr_eq(&a, &orig));
}
#[test]
fn rcu() {
const ITERATIONS: usize = 50;
const THREADS: usize = 10;
let shared = ArcSwap::from(Arc::new(0));
thread::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| {
for _ in 0..ITERATIONS {
shared.rcu(|old| **old + 1);
}
});
}
})
.unwrap();
assert_eq!(THREADS * ITERATIONS, **shared.load());
}
#[test]
fn cas_ref_cnt() {
#[cfg(miri)]
const ITERATIONS: usize = 10;
#[cfg(not(miri))]
const ITERATIONS: usize = 50;
let shared = ArcSwap::from(Arc::new(0));
for i in 0..ITERATIONS {
let orig = shared.load_full();
assert_eq!(i, *orig);
if i % 2 == 1 {
assert_eq!(2, Arc::strong_count(&orig));
}
let n1 = Arc::new(i + 1);
let fillup = || {
if i % 2 == 0 {
Some((0..ITERATIONS).map(|_| shared.load()).collect::<Vec<_>>())
} else {
None
}
};
let guards = fillup();
let prev = shared.compare_and_swap(&orig, Arc::clone(&n1));
assert!(ptr_eq(&orig, &prev));
drop(guards);
assert_eq!(2, Arc::strong_count(&orig));
assert_eq!(2, Arc::strong_count(&n1));
assert_eq!(i + 1, **shared.load());
let n2 = Arc::new(i);
drop(prev);
let guards = fillup();
let prev = Guard::into_inner(shared.compare_and_swap(&orig, Arc::clone(&n2)));
drop(guards);
assert!(ptr_eq(&n1, &prev));
assert_eq!(1, Arc::strong_count(&orig));
assert_eq!(3, Arc::strong_count(&n1));
assert_eq!(1, Arc::strong_count(&n2));
assert_eq!(i + 1, **shared.load());
}
let a = shared.load_full();
assert_eq!(2, Arc::strong_count(&a));
drop(shared);
assert_eq!(1, Arc::strong_count(&a));
}
}
};
}
t!(tests_default, DefaultStrategy);
#[cfg(all(feature = "internal-test-strategies", test))]
#[allow(deprecated)]
mod internal_strategies {
use super::*;
t!(
tests_full_slots,
crate::strategy::test_strategies::FillFastSlots
);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn load_cnt() {
let a = Arc::new(0);
let shared = ArcSwap::from(Arc::clone(&a));
assert_eq!(2, Arc::strong_count(&a));
let guard = shared.load();
assert_eq!(0, **guard);
assert_eq!(2, Arc::strong_count(&a));
let guard_2 = shared.load();
shared.store(Arc::new(1));
assert_eq!(3, Arc::strong_count(&a));
drop(guard_2);
assert_eq!(2, Arc::strong_count(&a));
let _b = Arc::clone(&guard);
assert_eq!(3, Arc::strong_count(&a));
drop(guard);
assert_eq!(2, Arc::strong_count(&a));
let guard = shared.load();
assert_eq!(1, **guard);
drop(shared);
assert_eq!(1, **guard);
let ptr = Arc::clone(&guard);
assert_eq!(2, Arc::strong_count(&ptr));
drop(guard);
assert_eq!(1, Arc::strong_count(&ptr));
}
#[test]
fn lease_overflow() {
#[cfg(miri)]
const GUARD_COUNT: usize = 100;
#[cfg(not(miri))]
const GUARD_COUNT: usize = 1000;
let a = Arc::new(0);
let shared = ArcSwap::from(Arc::clone(&a));
assert_eq!(2, Arc::strong_count(&a));
let mut guards = (0..GUARD_COUNT).map(|_| shared.load()).collect::<Vec<_>>();
let count = Arc::strong_count(&a);
assert!(count > 2);
let guard = shared.load();
assert_eq!(count + 1, Arc::strong_count(&a));
drop(guard);
assert_eq!(count, Arc::strong_count(&a));
guards.swap_remove(0);
assert_eq!(count, Arc::strong_count(&a));
let _guard = shared.load();
assert_eq!(count, Arc::strong_count(&a));
}
}