async_atomic/
async_.rs

1use crate::AsyncAtomic;
2use atomig::Atom;
3use core::{
4    future::Future,
5    ops::Deref,
6    pin::Pin,
7    sync::atomic::Ordering,
8    task::{Context, Poll},
9};
10use futures::stream::{FusedStream, Stream};
11use pin_project_lite::pin_project;
12
13/// Generic reference to async atomic.
14///
15/// Contains `async` methods which returns futures that wait for atomic value change.
16///
17/// *After one of the futures `poll`ed, all previously `poll`ed futures will not wake.*
18/// *This may cause a deadlock, however it is not an UB, so these methods are safe.*
19pub trait AsyncAtomicRef {
20    /// Type stored in atomic.
21    type Item: Atom;
22
23    /// Get reference to original atomic structure.
24    fn as_atomic(&self) -> &AsyncAtomic<Self::Item>;
25
26    /// Asynchronously wait for predicate to be `true`.
27    fn wait<F: FnMut(Self::Item) -> bool>(&self, pred: F) -> Wait<&Self, F> {
28        Wait { inner: self, pred }
29    }
30
31    /// Asynchronously wait until `map` returned `Some(x)` and then store `x` in atomic.
32    ///
33    /// This is an asynchronous version of [`fetch_update`][`AsyncAtomic::fetch_update`].
34    fn wait_and_update<F: FnMut(Self::Item) -> Option<Self::Item>>(
35        &self,
36        map: F,
37    ) -> WaitAndUpdate<&Self, F> {
38        WaitAndUpdate { inner: self, map }
39    }
40
41    /// Convert subscriber into stream that yields when value is changed.
42    fn changed(self) -> Changed<Self>
43    where
44        Self: Sized,
45        Self::Item: PartialEq + Clone,
46    {
47        Changed {
48            inner: self,
49            prev: None,
50        }
51    }
52}
53
54impl<T: Atom> AsyncAtomicRef for AsyncAtomic<T> {
55    type Item = T;
56    fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
57        self
58    }
59}
60
61impl<R: Deref<Target: AsyncAtomicRef>> AsyncAtomicRef for R {
62    type Item = <R::Target as AsyncAtomicRef>::Item;
63    fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
64        self.deref().as_atomic()
65    }
66}
67
68impl<T: Atom + PartialEq> AsyncAtomic<T> {}
69
70/// Future to wait for specific value.
71pub struct Wait<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> {
72    pub inner: R,
73    pub pred: F,
74}
75
76impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Unpin for Wait<R, F> {}
77
78impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Future for Wait<R, F> {
79    type Output = ();
80
81    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82        let atomic = self.inner.as_atomic();
83        atomic.waker.register(cx.waker());
84        let value = atomic.value.load(Ordering::Acquire);
85        // TODO: Evaluate predicate on store to avoid spurious wake-ups.
86        if (self.pred)(value) {
87            Poll::Ready(())
88        } else {
89            Poll::Pending
90        }
91    }
92}
93
94pin_project! {
95    /// Future to wait and update an atomic value.
96    pub struct WaitAndUpdate<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> {
97        pub inner: R,
98        pub map: F,
99    }
100}
101
102impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> Future for WaitAndUpdate<R, F> {
103    type Output = R::Item;
104
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        let mut this = self.project();
107        let atomic = this.inner.as_atomic();
108        atomic.waker.register(cx.waker());
109        match atomic
110            .value
111            .fetch_update(Ordering::AcqRel, Ordering::Acquire, &mut this.map)
112        {
113            Ok(x) => Poll::Ready(x),
114            Err(_) => Poll::Pending,
115        }
116    }
117}
118
119/// Stream that yields value when it change.
120pub struct Changed<R: AsyncAtomicRef<Item: PartialEq + Clone>> {
121    pub inner: R,
122    pub prev: Option<R::Item>,
123}
124
125impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Deref for Changed<R> {
126    type Target = R;
127    fn deref(&self) -> &Self::Target {
128        &self.inner
129    }
130}
131
132impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Unpin for Changed<R> {}
133
134impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Future for Changed<R> {
135    type Output = R::Item;
136
137    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138        let atomic = self.inner.as_atomic();
139        atomic.waker.register(cx.waker());
140        let value = atomic.value.load(Ordering::Acquire);
141        if self
142            .prev
143            .replace(value.clone())
144            .is_none_or(|prev| prev != value)
145        {
146            Poll::Ready(value)
147        } else {
148            Poll::Pending
149        }
150    }
151}
152
153impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Stream for Changed<R> {
154    type Item = R::Item;
155
156    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<R::Item>> {
157        self.poll(cx).map(Some)
158    }
159}
160
161impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> FusedStream for Changed<R> {
162    fn is_terminated(&self) -> bool {
163        false
164    }
165}