1use crate::AsyncAtomic;
2use atomig::Atom;
3use core::{
4 future::Future,
5 ops::Deref,
6 pin::Pin,
7 sync::atomic::Ordering,
8 task::{Context, Poll},
9};
10use futures::stream::{FusedStream, Stream};
11use pin_project_lite::pin_project;
12
13pub trait AsyncAtomicRef {
20 type Item: Atom;
22
23 fn as_atomic(&self) -> &AsyncAtomic<Self::Item>;
25
26 fn wait<F: FnMut(Self::Item) -> bool>(&self, pred: F) -> Wait<&Self, F> {
28 Wait { inner: self, pred }
29 }
30
31 fn wait_and_update<F: FnMut(Self::Item) -> Option<Self::Item>>(
35 &self,
36 map: F,
37 ) -> WaitAndUpdate<&Self, F> {
38 WaitAndUpdate { inner: self, map }
39 }
40
41 fn changed(self) -> Changed<Self>
43 where
44 Self: Sized,
45 Self::Item: PartialEq + Clone,
46 {
47 Changed {
48 inner: self,
49 prev: None,
50 }
51 }
52}
53
54impl<T: Atom> AsyncAtomicRef for AsyncAtomic<T> {
55 type Item = T;
56 fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
57 self
58 }
59}
60
61impl<R: Deref<Target: AsyncAtomicRef>> AsyncAtomicRef for R {
62 type Item = <R::Target as AsyncAtomicRef>::Item;
63 fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
64 self.deref().as_atomic()
65 }
66}
67
68impl<T: Atom + PartialEq> AsyncAtomic<T> {}
69
70pub struct Wait<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> {
72 pub inner: R,
73 pub pred: F,
74}
75
76impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Unpin for Wait<R, F> {}
77
78impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Future for Wait<R, F> {
79 type Output = ();
80
81 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82 let atomic = self.inner.as_atomic();
83 atomic.waker.register(cx.waker());
84 let value = atomic.value.load(Ordering::Acquire);
85 if (self.pred)(value) {
87 Poll::Ready(())
88 } else {
89 Poll::Pending
90 }
91 }
92}
93
94pin_project! {
95 pub struct WaitAndUpdate<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> {
97 pub inner: R,
98 pub map: F,
99 }
100}
101
102impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> Future for WaitAndUpdate<R, F> {
103 type Output = R::Item;
104
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 let mut this = self.project();
107 let atomic = this.inner.as_atomic();
108 atomic.waker.register(cx.waker());
109 match atomic
110 .value
111 .fetch_update(Ordering::AcqRel, Ordering::Acquire, &mut this.map)
112 {
113 Ok(x) => Poll::Ready(x),
114 Err(_) => Poll::Pending,
115 }
116 }
117}
118
119pub struct Changed<R: AsyncAtomicRef<Item: PartialEq + Clone>> {
121 pub inner: R,
122 pub prev: Option<R::Item>,
123}
124
125impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Deref for Changed<R> {
126 type Target = R;
127 fn deref(&self) -> &Self::Target {
128 &self.inner
129 }
130}
131
132impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Unpin for Changed<R> {}
133
134impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Future for Changed<R> {
135 type Output = R::Item;
136
137 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 let atomic = self.inner.as_atomic();
139 atomic.waker.register(cx.waker());
140 let value = atomic.value.load(Ordering::Acquire);
141 if self
142 .prev
143 .replace(value.clone())
144 .is_none_or(|prev| prev != value)
145 {
146 Poll::Ready(value)
147 } else {
148 Poll::Pending
149 }
150 }
151}
152
153impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Stream for Changed<R> {
154 type Item = R::Item;
155
156 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<R::Item>> {
157 self.poll(cx).map(Some)
158 }
159}
160
161impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> FusedStream for Changed<R> {
162 fn is_terminated(&self) -> bool {
163 false
164 }
165}