use crate::Atomic;
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{
future::Future,
marker::PhantomData,
ops::Deref,
pin::Pin,
sync::atomic::Ordering,
task::{Context, Poll},
};
use futures::stream::{FusedStream, Stream};
impl<T: Copy> Atomic<T> {
#[cfg(feature = "alloc")]
pub fn subscribe(self) -> Subscriber<T> {
Arc::new(self).subscribe_arc()
}
#[cfg(feature = "alloc")]
pub fn subscribe_arc(self: Arc<Self>) -> Subscriber<T> {
Subscriber::new(self)
}
pub fn subscribe_ref(&self) -> RefSubscriber<T> {
RefSubscriber::new(self)
}
}
pub struct GenericSubscriber<T: Copy, D: AsRef<R>, R: AsRef<Atomic<T>> = Atomic<T>> {
inner: D,
_ghost: PhantomData<(T, R)>,
}
#[cfg(feature = "alloc")]
pub type Subscriber<T> = GenericSubscriber<T, Arc<Atomic<T>>>;
pub type RefSubscriber<'a, T> = GenericSubscriber<T, &'a Atomic<T>>;
impl<T: Copy, D: AsRef<R>, R: AsRef<Atomic<T>>> GenericSubscriber<T, D, R> {
pub fn new(atomic_ref: D) -> Self {
Self {
inner: atomic_ref,
_ghost: PhantomData,
}
}
pub fn wait<F: FnMut(T) -> bool>(&mut self, pred: F) -> Wait<'_, T, F> {
Wait {
owner: self.inner.as_ref().as_ref(),
pred,
}
}
pub fn wait_and_update<F: FnMut(T) -> Option<T>>(&mut self, map: F) -> WaitAndUpdate<'_, T, F> {
WaitAndUpdate {
owner: self.inner.as_ref().as_ref(),
map,
}
}
}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> GenericSubscriber<T, D, R> {
pub fn changed(self) -> Changed<T, D, R> {
Changed {
inner: self.inner,
prev: None,
_ghost: PhantomData,
}
}
pub fn into_stream(self) -> Changed<T, D, R> {
self.changed()
}
}
impl<T: Copy, D: AsRef<R>, R: AsRef<Atomic<T>>> Deref for GenericSubscriber<T, D, R> {
type Target = D;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: Copy, D: AsRef<R>, R: AsRef<Atomic<T>>> AsRef<D> for GenericSubscriber<T, D, R> {
fn as_ref(&self) -> &D {
&self.inner
}
}
pub struct Wait<'a, T: Copy, F: FnMut(T) -> bool> {
owner: &'a Atomic<T>,
pred: F,
}
impl<'a, T: Copy, F: FnMut(T) -> bool> Unpin for Wait<'a, T, F> {}
impl<'a, T: Copy, F: FnMut(T) -> bool> Future for Wait<'a, T, F> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.waker.register(cx.waker());
let value = self.owner.value.load(Ordering::Acquire);
if (self.pred)(value) {
Poll::Ready(value)
} else {
Poll::Pending
}
}
}
pub struct WaitAndUpdate<'a, T: Copy, F: FnMut(T) -> Option<T>> {
owner: &'a Atomic<T>,
map: F,
}
impl<'a, T: Copy, F: FnMut(T) -> Option<T>> Unpin for WaitAndUpdate<'a, T, F> {}
impl<'a, T: Copy, F: FnMut(T) -> Option<T>> Future for WaitAndUpdate<'a, T, F> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.owner.waker.register(cx.waker());
match self
.owner
.value
.fetch_update(Ordering::AcqRel, Ordering::Acquire, &mut self.map)
{
Ok(x) => Poll::Ready(x),
Err(_) => Poll::Pending,
}
}
}
pub struct Changed<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>> = Atomic<T>> {
inner: D,
prev: Option<T>,
_ghost: PhantomData<R>,
}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> Deref for Changed<T, D, R> {
type Target = D;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> Unpin for Changed<T, D, R> {}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> Future for Changed<T, D, R> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
self.inner.as_ref().as_ref().waker.register(cx.waker());
let value = self.inner.as_ref().as_ref().value.load(Ordering::Acquire);
if self.prev.replace(value) != Some(value) {
Poll::Ready(value)
} else {
Poll::Pending
}
}
}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> Stream for Changed<T, D, R> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.poll(cx).map(Some)
}
}
impl<T: Copy + PartialEq, D: AsRef<R>, R: AsRef<Atomic<T>>> FusedStream for Changed<T, D, R> {
fn is_terminated(&self) -> bool {
false
}
}