#![feature(trait_alias)]
use std::sync::{RwLock, Arc};
pub struct Observable<T, Error = ()>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
subscriber: BoxedSubscriberFunction<T, Error>,
}
impl<T, Error> Observable<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
pub fn new<F, G>(subscriber: F) -> Self
where
F: Fn(SubscriptionObserver<T, Error>) -> G + Send + Sync + 'static,
G: Fn() + Send + Sync + 'static,
{
Self {
subscriber: Arc::new(move |subobserver| { Arc::new(subscriber(subobserver)) })
}
}
pub fn subscribe(&self, observer: impl Into<BoxedObserver<T, Error>>) -> Arc<Subscription<T, Error>> {
Subscription::new(observer.into(), Arc::clone(&self.subscriber))
}
pub fn map<U, F>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
where
U: Send + Sync + 'static,
F: SubscriberFunction<U, Error>,
{
let orig = self.clone();
let map_fn = Arc::new(map_fn);
Observable::<U, Error>::new(move |observer| {
let map_fn = map_fn.clone();
let observer = Arc::new(observer);
let subscription = orig.subscribe(observer! {
next: {
let observer = Arc::clone(&observer);
move |value: T| {
observer.next(map_fn(value));
}
},
error: {
let observer = Arc::clone(&observer);
move |error| {
observer.error(error);
}
},
complete: {
let observer = Arc::clone(&observer);
move || {
observer.complete();
}
},
});
move || {
subscription.unsubscribe();
}
})
}
pub fn filter<F>(&self, filter_fn: impl Fn(T) -> bool + 'static + Send + Sync) -> Observable<T, Error>
where
T: Clone,
F: SubscriberFunction<T, Error>,
{
let orig = self.clone();
let filter_fn = Arc::new(filter_fn);
Self::new(move |observer| {
let filter_fn = filter_fn.clone();
let observer = Arc::new(observer);
let subscription = orig.subscribe(observer! {
next: {
let observer = Arc::clone(&observer);
move |value: T| {
if filter_fn(value.clone()) {
observer.next(value);
}
}
},
error: {
let observer = Arc::clone(&observer);
move |error| {
observer.error(error);
}
},
complete: {
let observer = Arc::clone(&observer);
move || {
observer.complete();
}
},
});
move || {
subscription.unsubscribe();
}
})
}
}
impl<T, Iterable> From<Iterable> for Observable<T, ()>
where
Iterable: IntoIterator<Item = T> + Send + Sync,
T: Clone + Send + Sync + 'static
{
fn from(value: Iterable) -> Self {
let value = value.into_iter().collect::<Vec<T>>();
Self::new(move |observer| {
let cleanup = || {};
for item in &value {
observer.next(item.clone());
if observer.closed() {
return cleanup;
}
}
observer.complete();
cleanup
})
}
}
impl<T, Error> Clone for Observable<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
subscriber: Arc::clone(&self.subscriber)
}
}
}
pub trait SubscriberFunction<T, Error = ()> = Fn(SubscriptionObserver<T, Error>) -> Arc<dyn SubscriptionCleanupFunction> + Sync + Send + 'static
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static;
type BoxedSubscriberFunction<T, Error = ()> = Arc<(dyn SubscriberFunction<T, Error>)>;
pub trait SubscriptionCleanupFunction = Fn() + Sync + Send + 'static;
pub struct Subscription<T, Error = ()>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
cleanup: RwLock<Option<Arc<dyn Fn() + Sync + Send>>>,
observer: SubscriptionObserverLock<T, Error>,
}
type SubscriptionObserverLock<T, Error> = RwLock<Option<Arc<RwLock<BoxedObserver<T, Error>>>>>;
impl<T, Error> Subscription<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
fn new(observer: BoxedObserver<T, Error>, subscriber: BoxedSubscriberFunction<T, Error>) -> Arc<Self> {
let this = Arc::new(Self {
cleanup: RwLock::new(None),
observer: RwLock::new(Some(Arc::new(RwLock::new(observer)))),
});
this.observer.read().unwrap().as_ref().unwrap().read().unwrap().start(Arc::clone(&this));
if subscription_closed(&this) {
return this;
}
let observer = SubscriptionObserver { subscription: Arc::clone(&this) };
let cleanup = subscriber(observer);
*this.cleanup.write().unwrap() = Some(Arc::clone(&cleanup));
if subscription_closed(&this) {
cleanup_subscription(&this);
}
this
}
pub fn closed(&self) -> bool {
subscription_closed(self)
}
pub fn unsubscribe(&self) {
close_subscription(self);
}
}
pub struct SubscriptionObserver<T, Error = ()>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
subscription: Arc<Subscription<T, Error>>,
}
impl<T, Error> SubscriptionObserver<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
pub fn closed(&self) -> bool {
subscription_closed(&self.subscription)
}
pub fn next(&self, value: T) {
let subscription = Arc::clone(&self.subscription);
if subscription_closed(&subscription) {
return;
}
let observer = subscription.observer.read().unwrap().clone();
if observer.is_none() {
return;
}
observer.unwrap().read().unwrap().next(value);
}
pub fn error(&self, error: Error) {
let subscription = Arc::clone(&self.subscription);
if subscription_closed(&subscription) {
return;
}
let observer = subscription.observer.read().unwrap();
if let Some(o) = observer.as_ref().map(Arc::clone) {
drop(observer);
*subscription.observer.write().unwrap() = None;
o.read().unwrap().error(error);
} else {
}
cleanup_subscription(&subscription);
}
pub fn complete(&self) {
let subscription = Arc::clone(&self.subscription);
if subscription_closed(&subscription) {
return;
}
let observer = subscription.observer.read().unwrap();
if let Some(o) = observer.as_ref().map(Arc::clone) {
drop(observer);
*subscription.observer.write().unwrap() = None;
o.read().unwrap().complete();
}
cleanup_subscription(&subscription);
}
}
pub type BoxedObserver<T, Error = ()> = Box<dyn AbstractObserver<T, Error>>;
pub use rust_observable_literal::observer;
pub struct Observer<T, Error = ()>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
pub next: Box<dyn Fn(T) + Sync + Send>,
pub error: Box<dyn Fn(Error) + Sync + Send>,
pub complete: Box<dyn Fn() + Sync + Send>,
pub start: Box<dyn ObserverStartFunction<T, Error>>,
}
pub trait ObserverStartFunction<T, Error> = Fn(Arc<Subscription<T, Error>>) + Sync + Send
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static;
impl<T, Error> AbstractObserver<T, Error> for Observer<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
fn next(&self, value: T) {
(self.next)(value);
}
fn error(&self, error: Error) {
(self.error)(error);
}
fn complete(&self) {
(self.complete)();
}
fn start(&self, subscription: Arc<Subscription<T, Error>>) {
(self.start)(subscription);
}
}
impl<T, Error> Default for Observer<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
fn default() -> Self {
Self {
next: Box::new(|_| {}),
error: Box::new(|_| {}),
complete: Box::new(|| {}),
start: Box::new(|_| {}),
}
}
}
impl<T, Error> From<Observer<T, Error>> for BoxedObserver<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
fn from(value: Observer<T, Error>) -> Self {
Box::new(value)
}
}
pub trait AbstractObserver<T, Error = ()>: Send + Sync
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
fn next(&self, value: T) {
let _ = value;
}
fn error(&self, error: Error) {
let _ = error;
}
fn complete(&self) {}
fn start(&self, subscription: Arc<Subscription<T, Error>>) {
let _ = subscription;
}
}
fn cleanup_subscription<T, Error>(subscription: &Subscription<T, Error>)
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
assert!(subscription.observer.read().unwrap().is_none());
let cleanup = subscription.cleanup.read().unwrap().clone();
if cleanup.is_none() {
return;
}
let cleanup = Arc::clone(&cleanup.unwrap());
*subscription.cleanup.write().unwrap() = None;
cleanup();
}
fn subscription_closed<T, Error>(subscription: &Subscription<T, Error>) -> bool
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
let observer = subscription.observer.read().unwrap().clone();
observer.is_none()
}
fn close_subscription<T, Error>(subscription: &Subscription<T, Error>)
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
if subscription_closed(subscription) {
return;
}
*subscription.observer.write().unwrap() = None;
cleanup_subscription(subscription);
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn subscription() {
let list = Arc::new(RwLock::new(vec![]));
Observable::<_, ()>::new(|observer| {
for color in ["red", "green", "blue"] {
observer.next(color.to_owned());
}
|| {
}
})
.subscribe(observer! {
next: {
let list = Arc::clone(&list);
move |color| {
list.write().unwrap().push(color);
}
},
});
assert_eq!(
*list.read().unwrap(),
Vec::from_iter(["red", "green", "blue"])
);
let list = Arc::new(RwLock::new(vec![]));
Observable::from(Vec::from_iter(["red", "green", "blue"]))
.subscribe(observer! {
next: {
let list = Arc::clone(&list);
move |color| {
list.write().unwrap().push(color);
}
},
});
assert_eq!(
*list.read().unwrap(),
Vec::from_iter(["red", "green", "blue"])
);
}
}