#![allow(clippy::module_name_repetitions)]
pub mod cell;
pub mod progressable_cell;
use std::{
cell::RefCell,
fmt,
ops::{Deref, DerefMut},
};
use futures::{
channel::{mpsc, oneshot},
future::LocalBoxFuture,
stream::{self, LocalBoxStream, StreamExt as _},
};
use crate::subscribers_store::{
progressable, progressable::Processed, SubscribersStore,
};
#[doc(inline)]
pub use self::{cell::ObservableCell, progressable_cell::ProgressableCell};
type DefaultSubscribers<D> = RefCell<Vec<UniversalSubscriber<D>>>;
pub type Observable<D> = ObservableField<D, DefaultSubscribers<D>>;
pub type Progressable<D> = ObservableField<D, progressable::SubStore<D>>;
#[derive(Debug)]
pub struct ObservableField<D, S> {
data: D,
subs: S,
}
impl<D> ObservableField<D, RefCell<Vec<UniversalSubscriber<D>>>>
where
D: 'static,
{
#[inline]
pub fn new(data: D) -> Self {
Self {
data,
subs: RefCell::new(Vec::new()),
}
}
}
impl<D, S> ObservableField<D, S>
where
D: 'static,
S: Whenable<D>,
{
pub fn when<F>(
&self,
assert_fn: F,
) -> LocalBoxFuture<'static, Result<(), DroppedError>>
where
F: Fn(&D) -> bool + 'static,
{
if (assert_fn)(&self.data) {
Box::pin(futures::future::ok(()))
} else {
self.subs.when(Box::new(assert_fn))
}
}
}
impl<D: 'static> Progressable<D> {
#[inline]
pub fn new(data: D) -> Self {
Self {
data,
subs: progressable::SubStore::default(),
}
}
}
impl<D> Progressable<D>
where
D: Clone + 'static,
{
pub fn subscribe(
&self,
) -> LocalBoxStream<'static, progressable::Guarded<D>> {
let data = self.subs.wrap(self.data.clone());
Box::pin(stream::once(async move { data }).chain(self.subs.subscribe()))
}
pub fn when_all_processed(&self) -> Processed<'static> {
self.subs.when_all_processed()
}
}
impl<D> Observable<D>
where
D: Clone + 'static,
{
pub fn subscribe(&self) -> LocalBoxStream<'static, D> {
let data = self.data.clone();
let (tx, rx) = mpsc::unbounded();
self.subs
.borrow_mut()
.push(UniversalSubscriber::Subscribe(tx));
Box::pin(stream::once(async move { data }).chain(Box::pin(rx)))
}
}
impl<D, S> ObservableField<D, S>
where
D: PartialEq + 'static,
S: Whenable<D>,
{
#[inline]
pub fn when_eq(
&self,
should_be: D,
) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
self.when(move |data| data == &should_be)
}
}
impl<D, S> ObservableField<D, S>
where
S: OnObservableFieldModification<D>,
D: Clone + PartialEq,
{
#[inline]
pub fn borrow_mut(&mut self) -> MutObservableFieldGuard<'_, D, S> {
MutObservableFieldGuard {
value_before_mutation: self.data.clone(),
data: &mut self.data,
subs: &mut self.subs,
}
}
}
pub trait OnObservableFieldModification<D> {
fn on_modify(&mut self, data: &D);
}
pub enum UniversalSubscriber<D> {
When {
sender: RefCell<Option<oneshot::Sender<()>>>,
assert_fn: Box<dyn Fn(&D) -> bool>,
},
Subscribe(mpsc::UnboundedSender<D>),
}
impl<D> fmt::Debug for UniversalSubscriber<D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
UniversalSubscriber::When { .. } => {
write!(f, "UniversalSubscriber::When")
}
UniversalSubscriber::Subscribe(_) => {
write!(f, "UniversalSubscriber::Subscribe")
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct DroppedError;
impl fmt::Display for DroppedError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Observable value has been dropped")
}
}
impl From<oneshot::Canceled> for DroppedError {
#[inline]
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
pub trait Whenable<D: 'static> {
fn when(
&self,
assert_fn: Box<dyn Fn(&D) -> bool>,
) -> LocalBoxFuture<'static, Result<(), DroppedError>>;
}
#[allow(clippy::use_self)]
impl<D: 'static> Whenable<D> for RefCell<Vec<UniversalSubscriber<D>>> {
fn when(
&self,
assert_fn: Box<dyn Fn(&D) -> bool>,
) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
let (tx, rx) = oneshot::channel();
self.borrow_mut().push(UniversalSubscriber::When {
sender: RefCell::new(Some(tx)),
assert_fn,
});
Box::pin(async move { Ok(rx.await?) })
}
}
impl<D: Clone + 'static> OnObservableFieldModification<D>
for progressable::SubStore<D>
{
fn on_modify(&mut self, data: &D) {
self.send_update(data.clone());
}
}
impl<D: Clone> OnObservableFieldModification<D>
for RefCell<Vec<UniversalSubscriber<D>>>
{
fn on_modify(&mut self, data: &D) {
self.borrow_mut().retain(|sub| match sub {
UniversalSubscriber::When { assert_fn, sender } => {
if (assert_fn)(data) {
let _ = sender.borrow_mut().take().unwrap().send(());
false
} else {
true
}
}
UniversalSubscriber::Subscribe(sender) => {
sender.unbounded_send(data.clone()).is_ok()
}
});
}
}
impl<D, S> Deref for ObservableField<D, S> {
type Target = D;
#[inline]
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<D, S> fmt::Display for ObservableField<D, S>
where
D: fmt::Display,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.data, f)
}
}
#[derive(Debug)]
pub struct MutObservableFieldGuard<'a, D, S>
where
S: OnObservableFieldModification<D>,
D: PartialEq,
{
data: &'a mut D,
subs: &'a mut S,
value_before_mutation: D,
}
impl<'a, D, S> Deref for MutObservableFieldGuard<'a, D, S>
where
S: OnObservableFieldModification<D>,
D: PartialEq,
{
type Target = D;
#[inline]
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<'a, D, S> DerefMut for MutObservableFieldGuard<'a, D, S>
where
S: OnObservableFieldModification<D>,
D: PartialEq,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.data
}
}
impl<'a, D, S> Drop for MutObservableFieldGuard<'a, D, S>
where
S: OnObservableFieldModification<D>,
D: PartialEq,
{
#[inline]
fn drop(&mut self) {
if self.data != &self.value_before_mutation {
self.subs.on_modify(&self.data);
}
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, time::Duration};
use futures::{poll, task::Poll, StreamExt as _};
use tokio::time::timeout;
use crate::{Observable, Progressable};
#[tokio::test]
async fn subscriber_receives_current_data() {
let field = Observable::new(9);
let current_data = field.subscribe().next().await.unwrap();
assert_eq!(current_data, 9);
}
#[tokio::test]
async fn when_eq_resolves_if_value_eq_already() {
let field = Observable::new(9);
field.when_eq(9).await.unwrap();
}
#[tokio::test]
async fn when_eq_doesnt_resolve_if_value_is_not_eq() {
let field = Observable::new(9);
let _ = timeout(Duration::from_millis(50), field.when_eq(0))
.await
.unwrap_err();
}
#[tokio::test]
async fn current_value_is_provided_into_assert_fn_on_when_call() {
let field = Observable::new(9);
timeout(Duration::from_millis(50), field.when(|val| val == &9))
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn value_updates_are_sent_to_subs() {
let mut field = Observable::new(0);
let mut subscription_on_changes = field.subscribe();
for _ in 0..100 {
*field.borrow_mut() += 1;
}
loop {
if let Some(change) = subscription_on_changes.next().await {
if change == 100 {
break;
}
} else {
panic!("Stream ended too early!");
}
}
}
#[tokio::test]
async fn when_resolves_on_value_update() {
let mut field = Observable::new(0);
let subscription = field.when(|change| change == &100);
for _ in 0..100 {
*field.borrow_mut() += 1;
}
timeout(Duration::from_millis(50), subscription)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn when_eq_resolves_on_value_update() {
let mut field = Observable::new(0);
let subscription = field.when_eq(100);
for _ in 0..100 {
*field.borrow_mut() += 1;
}
timeout(Duration::from_millis(50), subscription)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn when_returns_dropped_error_on_drop() {
let field = Observable::new(0);
let subscription = field.when(|change| change == &100);
drop(field);
let _ = subscription.await.unwrap_err();
}
#[tokio::test]
async fn when_eq_returns_dropped_error_on_drop() {
let field = Observable::new(0);
let subscription = field.when_eq(100);
drop(field);
let _ = subscription.await.unwrap_err();
}
#[tokio::test]
async fn stream_ends_when_reactive_field_is_dropped() {
let field = Observable::new(0);
let subscription = field.subscribe();
drop(field);
assert!(subscription.skip(1).next().await.is_none());
}
#[tokio::test]
async fn no_update_should_be_emitted_on_field_mutation() {
let mut field = Observable::new(0);
let subscription = field.subscribe();
*field.borrow_mut() = 0;
let _ = timeout(
Duration::from_millis(50),
Box::pin(subscription.skip(1).next()),
)
.await
.unwrap_err();
}
#[tokio::test]
async fn only_last_update_should_be_sent_to_subscribers() {
let mut field = Observable::new(0);
let subscription = field.subscribe();
let mut field_mut_guard = field.borrow_mut();
*field_mut_guard = 100;
*field_mut_guard = 200;
*field_mut_guard = 300;
drop(field_mut_guard);
assert_eq!(subscription.skip(1).next().await.unwrap(), 300);
}
#[tokio::test]
async fn reactive_with_refcell_inside() {
let field = RefCell::new(Observable::new(0));
let subscription = field.borrow().when_eq(1);
*field.borrow_mut().borrow_mut() = 1;
timeout(Duration::from_millis(50), Box::pin(subscription))
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn when_all_processed_works() {
let mut field = Progressable::new(1);
assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
*field.borrow_mut() = 2;
assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
let mut subscribe = field.subscribe();
assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
assert_eq!(subscribe.next().await.unwrap().into_inner(), 2);
*field.borrow_mut() = 3;
assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
assert_eq!(subscribe.next().await.unwrap().into_inner(), 3);
assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
}
}