async_atomic/
atomic.rs

1use atomig::{
2    impls::{PrimitiveAtom, PrimitiveAtomInteger, PrimitiveAtomLogic},
3    Atom, AtomInteger, AtomLogic, Atomic as BasicAtomic,
4};
5use core::sync::atomic::Ordering;
6use futures::task::AtomicWaker;
7
8/// Atomic value that also contains [`Waker`](`core::task::Waker`) to notify subscriber asynchronously.
9///
10/// *There is only a single waker, so there should be only single subscription at a time.*
11/// *Otherwise older subscriptions will not receive updates anymore.*
12#[derive(Default, Debug)]
13pub struct AsyncAtomic<T: Atom> {
14    pub(crate) value: BasicAtomic<T>,
15    pub(crate) waker: AtomicWaker,
16}
17
18impl<T: Atom> AsyncAtomic<T> {
19    pub fn new(value: T) -> Self {
20        Self {
21            value: BasicAtomic::new(value),
22            waker: AtomicWaker::new(),
23        }
24    }
25
26    pub const fn from_impl(repr: <T::Repr as PrimitiveAtom>::Impl) -> Self {
27        Self {
28            value: BasicAtomic::from_impl(repr),
29            waker: AtomicWaker::new(),
30        }
31    }
32
33    pub fn load(&self) -> T {
34        self.value.load(Ordering::Acquire)
35    }
36
37    pub fn store(&self, val: T) {
38        self.value.store(val, Ordering::Release);
39        self.waker.wake();
40    }
41
42    pub fn swap(&self, val: T) -> T {
43        let old = self.value.swap(val, Ordering::AcqRel);
44        self.waker.wake();
45        old
46    }
47
48    pub fn compare_exchange(&self, current: T, new: T) -> Result<T, T> {
49        self.value
50            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
51            .inspect(|_| self.waker.wake())
52    }
53
54    pub fn fetch_update<F: FnMut(T) -> Option<T>>(&self, f: F) -> Result<T, T> {
55        self.value
56            .fetch_update(Ordering::AcqRel, Ordering::Acquire, f)
57            .inspect(|_| self.waker.wake())
58    }
59}
60
61impl<T: AtomLogic> AsyncAtomic<T>
62where
63    T::Repr: PrimitiveAtomLogic,
64{
65    pub fn fetch_and(&self, val: T) -> T {
66        let old = self.value.fetch_and(val, Ordering::AcqRel);
67        self.waker.wake();
68        old
69    }
70    pub fn fetch_or(&self, val: T) -> T {
71        let old = self.value.fetch_or(val, Ordering::AcqRel);
72        self.waker.wake();
73        old
74    }
75    pub fn fetch_xor(&self, val: T) -> T {
76        let old = self.value.fetch_xor(val, Ordering::AcqRel);
77        self.waker.wake();
78        old
79    }
80}
81
82impl<T: AtomInteger> AsyncAtomic<T>
83where
84    T::Repr: PrimitiveAtomInteger,
85{
86    pub fn fetch_add(&self, val: T) -> T {
87        let old = self.value.fetch_add(val, Ordering::AcqRel);
88        self.waker.wake();
89        old
90    }
91    pub fn fetch_sub(&self, val: T) -> T {
92        let old = self.value.fetch_sub(val, Ordering::AcqRel);
93        self.waker.wake();
94        old
95    }
96    pub fn fetch_max(&self, val: T) -> T {
97        let old = self.value.fetch_max(val, Ordering::AcqRel);
98        self.waker.wake();
99        old
100    }
101    pub fn fetch_min(&self, val: T) -> T {
102        let old = self.value.fetch_min(val, Ordering::AcqRel);
103        self.waker.wake();
104        old
105    }
106}
107
108impl<T: Atom> AsRef<AsyncAtomic<T>> for AsyncAtomic<T> {
109    fn as_ref(&self) -> &AsyncAtomic<T> {
110        self
111    }
112}