#![doc(
html_root_url = "https://docs.rs/arc-swap/0.3.3/arc-swap/",
test(attr(deny(warnings)))
)]
#![deny(missing_docs, warnings)]
#![allow(renamed_and_removed_lints)]
mod as_raw;
mod debt;
mod ref_cnt;
use std::cell::Cell;
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
pub use as_raw::AsRaw;
use debt::Debt;
pub use ref_cnt::{NonNull, RefCnt};
struct GenLock {
shard: usize,
gen: usize,
}
impl GenLock {
fn new(signal_safe: SignalSafety) -> GenLock {
let shard = match signal_safe {
SignalSafety::Safe => 0,
SignalSafety::Unsafe => Shard::choose(),
};
let gen = GEN_IDX.load(Ordering::Relaxed) % GEN_CNT;
SHARDS[shard].0[gen].fetch_add(1, Ordering::SeqCst);
GenLock { shard, gen }
}
fn unlock(self) {
SHARDS[self.shard].0[self.gen].fetch_sub(1, Ordering::AcqRel);
mem::forget(self);
}
}
#[cfg(debug_assertions)] impl Drop for GenLock {
fn drop(&mut self) {
unreachable!("Forgot to unlock generation");
}
}
pub struct Guard<'a, T: RefCnt + 'a>
where
T::Base: 'a,
{
lock: Option<GenLock>,
ptr: *const T::Base,
_arc_swap: PhantomData<&'a ArcSwapAny<T>>,
}
impl<'a, T: RefCnt> Guard<'a, T> {
pub fn upgrade(guard: &Self) -> T {
let res = unsafe { T::from_ptr(guard.ptr) };
T::inc(&res);
res
}
pub fn lease(guard: &Self) -> Lease<T> {
let debt = Debt::new(guard.ptr as usize);
if debt.is_none() {
let res = unsafe { T::from_ptr(guard.ptr) };
T::inc(&res);
T::into_ptr(res);
}
Lease {
ptr: guard.ptr,
debt,
_data: PhantomData,
}
}
#[cfg_attr(feature = "cargo-clippy", allow(needless_lifetimes))]
pub fn get_ref<'g>(guard: &'g Self) -> Option<&'g T::Base> {
unsafe { guard.ptr.as_ref() }
}
}
impl<'a, T: NonNull> Deref for Guard<'a, T> {
type Target = T::Base;
fn deref(&self) -> &T::Base {
unsafe { self.ptr.as_ref().unwrap() }
}
}
impl<'a, T: RefCnt> Drop for Guard<'a, T> {
fn drop(&mut self) {
self.lock.take().unwrap().unlock();
}
}
static THREAD_ID_GEN: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static THREAD_SHARD: Cell<usize> = Cell::new(SHARD_CNT);
}
pub struct Lease<T: RefCnt> {
ptr: *const T::Base,
debt: Option<&'static Debt>,
_data: PhantomData<T>,
}
impl<T: RefCnt> Lease<T> {
pub fn upgrade(guard: &Self) -> T {
let res = unsafe { T::from_ptr(guard.ptr) };
T::inc(&res);
res
}
#[cfg_attr(feature = "cargo-clippy", allow(wrong_self_convention))]
pub fn into_upgrade(lease: Self) -> T {
let res = unsafe { T::from_ptr(lease.ptr) };
if let Some(debt) = lease.debt {
T::inc(&res);
if !debt.pay::<T>(lease.ptr) {
unsafe { T::dec(lease.ptr) };
}
}
mem::forget(lease);
res
}
pub fn get_ref(lease: &Self) -> Option<&T::Base> {
unsafe { lease.ptr.as_ref() }
}
pub fn is_null(lease: &Self) -> bool {
lease.ptr.is_null()
}
}
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn ptr_eq<Base, A, B>(a: A, b: B) -> bool
where
A: AsRaw<Base>,
B: AsRaw<Base>,
{
let a = a.as_raw();
let b = b.as_raw();
ptr::eq(a, b)
}
impl<T: NonNull> Deref for Lease<T> {
type Target = T::Base;
fn deref(&self) -> &T::Base {
unsafe { self.ptr.as_ref().unwrap() }
}
}
impl<T> Debug for Lease<T>
where
T: RefCnt,
T::Base: Debug,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
let l = Lease::get_ref(&self);
if T::can_null() {
l.fmt(formatter)
} else {
l.unwrap().fmt(formatter)
}
}
}
impl<T> Display for Lease<T>
where
T: NonNull,
T::Base: Display,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
self.deref().fmt(formatter)
}
}
impl<T: RefCnt> Drop for Lease<T> {
fn drop(&mut self) {
if let Some(debt) = self.debt {
if debt.pay::<T>(self.ptr) {
return;
}
}
unsafe { T::dec(self.ptr) };
}
}
const GEN_CNT: usize = 2;
#[derive(Copy, Clone)]
enum SignalSafety {
Safe,
Unsafe,
}
static GEN_IDX: AtomicUsize = AtomicUsize::new(0);
const SHARD_CNT: usize = 9;
#[repr(align(64))]
#[derive(Default)]
struct Shard([AtomicUsize; GEN_CNT]);
macro_rules! sh {
() => {
Shard([AtomicUsize::new(0), AtomicUsize::new(0)])
};
}
static SHARDS: [Shard; SHARD_CNT] = [
sh!(),
sh!(),
sh!(),
sh!(),
sh!(),
sh!(),
sh!(),
sh!(),
sh!(),
];
impl Shard {
fn choose() -> usize {
THREAD_SHARD
.try_with(|ts| {
let mut val = ts.get();
if val >= SHARD_CNT {
val = THREAD_ID_GEN.fetch_add(1, Ordering::Relaxed) % SHARD_CNT;
ts.set(val);
}
val
})
.unwrap_or(0)
}
fn snapshot(&self) -> [usize; GEN_CNT] {
[
self.0[0].load(Ordering::Acquire),
self.0[1].load(Ordering::Acquire),
]
}
}
const YIELD_EVERY: usize = 16;
pub struct ArcSwapAny<T: RefCnt> {
ptr: AtomicPtr<T::Base>,
_phantom_arc: PhantomData<T>,
}
impl<T: RefCnt> From<T> for ArcSwapAny<T> {
fn from(val: T) -> Self {
let ptr = T::into_ptr(val);
Self {
ptr: AtomicPtr::new(ptr),
_phantom_arc: PhantomData,
}
}
}
impl<T: RefCnt> Drop for ArcSwapAny<T> {
fn drop(&mut self) {
let ptr = *self.ptr.get_mut();
self.wait_for_readers(ptr);
unsafe { T::dec(ptr) };
}
}
impl<T: RefCnt> Clone for ArcSwapAny<T> {
fn clone(&self) -> Self {
Self::from(self.load())
}
}
impl<T> Debug for ArcSwapAny<T>
where
T: RefCnt,
T::Base: Debug,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
let guard = self.peek();
let r = Guard::get_ref(&guard);
if T::can_null() {
r.fmt(formatter)
} else {
r.unwrap().fmt(formatter)
}
}
}
impl<T> Display for ArcSwapAny<T>
where
T: NonNull,
T::Base: Display,
{
fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
self.peek().deref().fmt(formatter)
}
}
impl<T: RefCnt> ArcSwapAny<T> {
pub fn new(val: T) -> Self {
Self::from(val)
}
pub fn into_inner(mut self) -> T {
let ptr = *self.ptr.get_mut();
self.wait_for_readers(ptr);
mem::forget(self);
unsafe { T::from_ptr(ptr) }
}
pub fn load(&self) -> T {
Guard::upgrade(&self.peek())
}
fn peek_inner(&self, signal_safe: SignalSafety) -> Guard<T> {
let gen = GenLock::new(signal_safe);
let ptr = self.ptr.load(Ordering::Acquire);
Guard {
lock: Some(gen),
_arc_swap: PhantomData,
ptr,
}
}
pub fn peek(&self) -> Guard<T> {
self.peek_inner(SignalSafety::Unsafe)
}
pub fn peek_signal_safe(&self) -> Guard<T> {
self.peek_inner(SignalSafety::Safe)
}
pub fn lease(&self) -> Lease<T> {
Guard::lease(&self.peek())
}
pub fn store(&self, val: T) {
drop(self.swap(val));
}
pub fn swap(&self, new: T) -> T {
let new = T::into_ptr(new);
let old = self.ptr.swap(new, Ordering::SeqCst);
self.wait_for_readers(old);
unsafe { T::from_ptr(old) }
}
pub fn compare_and_swap<C: AsRaw<T::Base>>(&self, current: C, new: T) -> Lease<T> {
let cur_ptr = current.as_raw();
let new = T::into_ptr(new);
let gen = GenLock::new(SignalSafety::Unsafe);
let previous_ptr = self.ptr.compare_and_swap(cur_ptr, new, Ordering::SeqCst);
let swapped = ptr::eq(cur_ptr, previous_ptr);
drop(current);
let debt = if swapped {
None
} else {
let debt = Debt::new(previous_ptr as usize);
if debt.is_none() {
let previous = unsafe { T::from_ptr(previous_ptr) };
T::inc(&previous);
T::into_ptr(previous);
}
debt
};
gen.unlock();
if swapped {
self.wait_for_readers(previous_ptr);
} else {
unsafe { T::dec(new) };
}
Lease {
ptr: previous_ptr,
debt,
_data: PhantomData,
}
}
fn wait_for_readers(&self, old: *const T::Base) {
let mut seen_group = [false; GEN_CNT];
let mut iter = 0usize;
while !seen_group.iter().all(|seen| *seen) {
let gen = GEN_IDX.load(Ordering::Relaxed);
let groups = SHARDS.iter().fold([0, 0], |[a1, a2], s| {
let [v1, v2] = s.snapshot();
[a1 + v1, a2 + v2]
});
let next_gen = gen.wrapping_add(1);
if groups[next_gen % GEN_CNT] == 0 {
GEN_IDX.compare_and_swap(gen, next_gen, Ordering::Relaxed);
}
for i in 0..GEN_CNT {
seen_group[i] = seen_group[i] || (groups[i] == 0);
}
iter = iter.wrapping_add(1);
if iter % YIELD_EVERY == 0 {
thread::yield_now();
} else {
atomic::spin_loop_hint();
}
}
Debt::pay_all::<T>(old);
}
pub fn rcu<R, F>(&self, mut f: F) -> T
where
F: FnMut(&Lease<T>) -> R,
R: Into<T>,
{
let mut cur = self.lease();
loop {
let new = f(&cur).into();
let prev = self.compare_and_swap(&cur, new);
let swapped = ptr_eq(&cur, &prev);
if swapped {
return Lease::into_upgrade(prev);
} else {
cur = prev;
}
}
}
}
pub type ArcSwap<T> = ArcSwapAny<Arc<T>>;
impl<T> ArcSwap<T> {
pub fn from_pointee(val: T) -> Self {
Self::from(Arc::new(val))
}
pub fn rcu_unwrap<R, F>(&self, mut f: F) -> T
where
F: FnMut(&T) -> R,
R: Into<Arc<T>>,
{
let mut wrapped = self.rcu(|prev| f(&*prev));
loop {
match Arc::try_unwrap(wrapped) {
Ok(val) => return val,
Err(w) => {
wrapped = w;
thread::yield_now();
}
}
}
}
}
pub type ArcSwapOption<T> = ArcSwapAny<Option<Arc<T>>>;
impl<T> ArcSwapOption<T> {
pub fn from_pointee<V: Into<Option<T>>>(val: V) -> Self {
ArcSwapOption::new(val.into().map(Arc::new))
}
pub fn empty() -> Self {
ArcSwapOption::new(None)
}
}
impl<T> Default for ArcSwapOption<T> {
fn default() -> Self {
Self::empty()
}
}
#[cfg(test)]
mod tests {
extern crate crossbeam_utils;
use std::panic;
use std::sync::atomic::AtomicUsize;
use std::sync::Barrier;
use self::crossbeam_utils::thread;
use super::*;
#[test]
fn publish() {
for _ in 0..100 {
let config = ArcSwap::from(Arc::new(String::default()));
let ended = AtomicUsize::new(0);
thread::scope(|scope| {
for _ in 0..20 {
scope.spawn(|| loop {
let cfg = config.load();
if !cfg.is_empty() {
assert_eq!(*cfg, "New configuration");
ended.fetch_add(1, Ordering::Relaxed);
return;
}
atomic::spin_loop_hint();
});
}
scope.spawn(|| {
let new_conf = Arc::new("New configuration".to_owned());
config.store(new_conf);
});
});
assert_eq!(20, ended.load(Ordering::Relaxed));
assert_eq!(2, Arc::strong_count(&config.load()));
assert_eq!(0, Arc::weak_count(&config.load()));
}
}
#[test]
fn swap_load() {
for _ in 0..100 {
let arc = Arc::new(42);
let arc_swap = ArcSwap::from(Arc::clone(&arc));
assert_eq!(42, *arc_swap.load());
assert_eq!(42, *arc_swap.load());
let new_arc = Arc::new(0);
assert_eq!(42, *arc_swap.swap(Arc::clone(&new_arc)));
assert_eq!(0, *arc_swap.load());
assert_eq!(3, Arc::strong_count(&arc_swap.load()));
assert_eq!(0, Arc::weak_count(&arc_swap.load()));
assert_eq!(1, Arc::strong_count(&arc));
assert_eq!(0, Arc::weak_count(&arc));
}
}
#[test]
fn multi_writers() {
let first_value = Arc::new((0, 0));
let shared = ArcSwap::from(Arc::clone(&first_value));
const WRITER_CNT: usize = 2;
const READER_CNT: usize = 3;
const ITERATIONS: usize = 100;
const SEQ: usize = 50;
let barrier = Barrier::new(READER_CNT + WRITER_CNT);
thread::scope(|scope| {
for w in 0..WRITER_CNT {
let barrier = &barrier;
let shared = &shared;
let first_value = &first_value;
scope.spawn(move || {
for _ in 0..ITERATIONS {
barrier.wait();
shared.store(Arc::clone(&first_value));
barrier.wait();
for i in 0..SEQ {
shared.store(Arc::new((w, i + 1)));
}
}
});
}
for _ in 0..READER_CNT {
scope.spawn(|| {
for _ in 0..ITERATIONS {
barrier.wait();
barrier.wait();
let mut previous = [0; 2];
let mut last = Arc::clone(&first_value);
loop {
let cur = shared.load();
if Arc::ptr_eq(&last, &cur) {
atomic::spin_loop_hint();
continue;
}
let (w, s) = *cur;
assert!(previous[w] < s);
previous[w] = s;
last = cur;
if s == SEQ {
break;
}
}
}
});
}
});
}
#[test]
fn cas_ref_cnt() {
const ITERATIONS: usize = 50;
let shared = ArcSwap::from(Arc::new(0));
for i in 0..ITERATIONS {
let orig = shared.load();
assert_eq!(i, *orig);
if i % 2 == 1 {
assert_eq!(2, Arc::strong_count(&orig));
}
let n1 = Arc::new(i + 1);
let fillup = || {
if i % 2 == 0 {
Some(
(0..50)
.into_iter()
.map(|_| shared.lease())
.collect::<Vec<_>>(),
)
} else {
None
}
};
let leases = fillup();
let prev = shared.compare_and_swap(&orig, Arc::clone(&n1));
assert!(ptr_eq(&orig, &prev));
drop(leases);
assert_eq!(2, Arc::strong_count(&orig));
assert_eq!(2, Arc::strong_count(&n1));
assert_eq!(i + 1, *shared.peek());
let n2 = Arc::new(i);
drop(prev);
let leases = fillup();
let prev = Lease::into_upgrade(shared.compare_and_swap(&orig, Arc::clone(&n2)));
drop(leases);
assert!(ptr_eq(&n1, &prev));
assert_eq!(1, Arc::strong_count(&orig));
assert_eq!(3, Arc::strong_count(&n1));
assert_eq!(1, Arc::strong_count(&n2));
assert_eq!(i + 1, *shared.peek());
}
let a = shared.load();
assert_eq!(2, Arc::strong_count(&a));
drop(shared);
assert_eq!(1, Arc::strong_count(&a));
}
#[test]
fn rcu() {
const ITERATIONS: usize = 50;
const THREADS: usize = 10;
let shared = ArcSwap::from(Arc::new(0));
thread::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|| {
for _ in 0..ITERATIONS {
shared.rcu(|old| **old + 1);
}
});
}
});
assert_eq!(THREADS * ITERATIONS, *shared.load());
}
#[test]
fn rcu_unwrap() {
const ITERATIONS: usize = 50;
const THREADS: usize = 10;
let shared = ArcSwap::from(Arc::new(0));
thread::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|| {
for _ in 0..ITERATIONS {
shared.rcu_unwrap(|old| *old + 1);
}
});
}
});
assert_eq!(THREADS * ITERATIONS, *shared.load());
}
#[test]
fn nulls() {
let shared = ArcSwapOption::from(Some(Arc::new(0)));
let orig = shared.swap(None);
assert_eq!(1, Arc::strong_count(&orig.unwrap()));
let null = shared.load();
assert!(null.is_none());
let a = Arc::new(42);
let orig = shared.compare_and_swap(ptr::null(), Some(Arc::clone(&a)));
assert!(Lease::is_null(&orig));
assert_eq!(2, Arc::strong_count(&a));
let orig = Lease::into_upgrade(shared.compare_and_swap(&None::<Arc<_>>, None));
assert_eq!(3, Arc::strong_count(&a));
assert!(ptr_eq(&a, &orig));
}
#[test]
fn recursive() {
let shared = ArcSwap::from(Arc::new(0));
shared.rcu(|i| {
if **i < 10 {
shared.rcu(|i| **i + 1);
}
**i
});
assert_eq!(10, *shared.peek());
assert_eq!(2, Arc::strong_count(&shared.load()));
}
#[test]
fn rcu_panic() {
let shared = ArcSwap::from(Arc::new(0));
assert!(panic::catch_unwind(|| shared.rcu(|_| -> usize { panic!() })).is_err());
assert_eq!(1, Arc::strong_count(&shared.swap(Arc::new(42))));
}
#[test]
fn lease_cnt() {
let a = Arc::new(0);
let shared = ArcSwap::from(Arc::clone(&a));
assert_eq!(2, Arc::strong_count(&a));
let lease = shared.lease();
assert_eq!(0, *lease);
assert_eq!(2, Arc::strong_count(&a));
let lease_2 = shared.lease();
shared.store(Arc::new(1));
assert_eq!(3, Arc::strong_count(&a));
drop(lease_2);
assert_eq!(2, Arc::strong_count(&a));
let _b = Lease::upgrade(&lease);
assert_eq!(3, Arc::strong_count(&a));
drop(lease);
assert_eq!(2, Arc::strong_count(&a));
let lease = shared.lease();
assert_eq!(1, *lease);
drop(shared);
assert_eq!(1, *lease);
let ptr = Lease::upgrade(&lease);
assert_eq!(2, Arc::strong_count(&ptr));
drop(lease);
assert_eq!(1, Arc::strong_count(&ptr));
}
#[test]
fn lease_overflow() {
let a = Arc::new(0);
let shared = ArcSwap::from(Arc::clone(&a));
assert_eq!(2, Arc::strong_count(&a));
let mut leases = (0..1000)
.into_iter()
.map(|_| shared.lease())
.collect::<Vec<_>>();
let count = Arc::strong_count(&a);
assert!(count > 2);
let lease = shared.lease();
assert_eq!(count + 1, Arc::strong_count(&a));
drop(lease);
assert_eq!(count, Arc::strong_count(&a));
leases.swap_remove(0);
let _lease = shared.lease();
assert_eq!(count, Arc::strong_count(&a));
}
#[test]
fn lease_null() {
let shared = ArcSwapOption::<usize>::from(None);
let lease = shared.lease();
assert!(Lease::get_ref(&lease).is_none());
shared.store(Some(Arc::new(42)));
assert_eq!(42, *Lease::get_ref(&shared.lease()).unwrap());
}
#[test]
fn from_into() {
let a = Arc::new(42);
let shared = ArcSwap::new(a);
let lease = shared.lease();
let a = shared.into_inner();
assert_eq!(42, *a);
assert_eq!(2, Arc::strong_count(&a));
drop(lease);
assert_eq!(1, Arc::strong_count(&a));
}
}