use crate::AsyncAtomic;
use atomig::Atom;
use core::{
future::Future,
ops::Deref,
pin::Pin,
sync::atomic::Ordering,
task::{Context, Poll},
};
use futures::stream::{FusedStream, Stream};
use pin_project_lite::pin_project;
pub trait AsyncAtomicRef {
type Item: Atom;
fn as_atomic(&self) -> &AsyncAtomic<Self::Item>;
fn wait<F: FnMut(Self::Item) -> bool>(&self, pred: F) -> Wait<&Self, F> {
Wait { inner: self, pred }
}
fn wait_and_update<F: FnMut(Self::Item) -> Option<Self::Item>>(
&self,
map: F,
) -> WaitAndUpdate<&Self, F> {
WaitAndUpdate { inner: self, map }
}
fn changed(self) -> Changed<Self>
where
Self: Sized,
Self::Item: PartialEq + Clone,
{
Changed {
inner: self,
prev: None,
}
}
}
impl<T: Atom> AsyncAtomicRef for AsyncAtomic<T> {
type Item = T;
fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
self
}
}
impl<R: Deref<Target: AsyncAtomicRef>> AsyncAtomicRef for R {
type Item = <R::Target as AsyncAtomicRef>::Item;
fn as_atomic(&self) -> &AsyncAtomic<Self::Item> {
self.deref().as_atomic()
}
}
impl<T: Atom + PartialEq> AsyncAtomic<T> {}
pub struct Wait<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> {
pub inner: R,
pub pred: F,
}
impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Unpin for Wait<R, F> {}
impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> bool> Future for Wait<R, F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let atomic = self.inner.as_atomic();
atomic.waker.register(cx.waker());
let value = atomic.value.load(Ordering::Acquire);
if (self.pred)(value) {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
pin_project! {
pub struct WaitAndUpdate<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> {
pub inner: R,
pub map: F,
}
}
impl<R: AsyncAtomicRef, F: FnMut(R::Item) -> Option<R::Item>> Future for WaitAndUpdate<R, F> {
type Output = R::Item;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let atomic = this.inner.as_atomic();
atomic.waker.register(cx.waker());
match atomic
.value
.fetch_update(Ordering::AcqRel, Ordering::Acquire, &mut this.map)
{
Ok(x) => Poll::Ready(x),
Err(_) => Poll::Pending,
}
}
}
pub struct Changed<R: AsyncAtomicRef<Item: PartialEq + Clone>> {
pub inner: R,
pub prev: Option<R::Item>,
}
impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Deref for Changed<R> {
type Target = R;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Unpin for Changed<R> {}
impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Future for Changed<R> {
type Output = R::Item;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let atomic = self.inner.as_atomic();
atomic.waker.register(cx.waker());
let value = atomic.value.load(Ordering::Acquire);
if self
.prev
.replace(value.clone())
.is_none_or(|prev| prev != value)
{
Poll::Ready(value)
} else {
Poll::Pending
}
}
}
impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> Stream for Changed<R> {
type Item = R::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<R::Item>> {
self.poll(cx).map(Some)
}
}
impl<R: AsyncAtomicRef<Item: PartialEq + Clone>> FusedStream for Changed<R> {
fn is_terminated(&self) -> bool {
false
}
}