use super::dispatcher::{replicate, DispatchTarget, Dispatcher, DispatcherType, Invoker, SubscriptionDispatcher};
use super::scheduler::{make_scheduler, Scheduler, SchedulerType};
use super::subscription::Subscription;
use crate::sync::threadpool::Task;
use log::warn;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak};
pub trait ObservableType: Send + Sync + Clone + Debug + 'static {}
impl<T> ObservableType for T where T: Send + Sync + Clone + Debug + 'static {}
pub(super) trait Owner: Send + Sync {
fn id(&self) -> usize;
fn finish(&self);
fn finished(&self) -> bool {
false
}
fn scheduler(&self) -> Arc<dyn Scheduler>;
fn add_finalize(&self, task: Task);
fn handle(&self, signal: Signal);
fn owner(&self) -> Option<Arc<dyn Owner>> {
None
}
fn initialize(&self) {}
}
pub(super) struct DummyOwner;
impl Owner for DummyOwner {
fn id(&self) -> usize {
unimplemented!()
}
fn finish(&self) {
unimplemented!()
}
fn scheduler(&self) -> Arc<dyn Scheduler> {
unimplemented!()
}
fn add_finalize(&self, _task: Task) {
unimplemented!()
}
fn handle(&self, _signal: Signal) {
unimplemented!()
}
fn owner(&self) -> Option<Arc<dyn Owner>> {
unimplemented!()
}
fn initialize(&self) {
unimplemented!()
}
}
type ResolverFn = dyn Fn() + Send + Sync;
#[derive(Clone)]
struct PipeResolver {
func: Arc<ResolverFn>,
}
impl PipeResolver {
fn new(func: Arc<ResolverFn>) -> Self {
PipeResolver { func }
}
fn invoke(&self) {
(self.func)()
}
}
type Pipeable<T> = Arc<RwLock<Box<dyn Dispatcher<T>>>>;
pub struct Pipe<T>
where
T: ObservableType,
{
destination: Option<Pipeable<T>>,
pub(super) next: Pipeable<T>,
resolver: Option<PipeResolver>,
dead: bool,
}
impl<T> Pipe<T>
where
T: ObservableType,
{
fn new(destination: Option<Pipeable<T>>, next: Pipeable<T>, resolver: Option<PipeResolver>) -> Self {
Pipe {
destination,
next,
resolver,
dead: false,
}
}
pub(super) fn attach<A>(&mut self, observable: Arc<Observable<A>>, target: DispatchTarget<T>) -> Pipe<A>
where
A: ObservableType,
{
self.dead = true;
let resolver = if self.resolver.is_some() {
self.next.write().unwrap().add_child(target);
if let Some(task) = self.next.read().unwrap().bootstrap(observable.id()) {
task.invoke();
}
self.resolver.clone()
} else {
let destination = self.destination.take().unwrap();
let cloned = observable.clone();
let scheduler = self.scheduler();
Some(PipeResolver::new(Arc::new(move || {
destination.write().unwrap().add_child(target.clone());
if let Some(task) = destination.read().unwrap().bootstrap(cloned.id()) {
scheduler.execute(task);
}
})))
};
Pipe::<A>::new(None, observable.pipeable.clone(), resolver)
}
pub(super) fn forward(&mut self) -> Self {
self.dead = true;
Pipe::new(self.destination.clone(), self.next.clone(), self.resolver.clone())
}
pub(super) fn instantiate(&self) {
if self.dead {
panic!("observable pipe instantiated twice");
}
if let Some(resolver) = &self.resolver {
resolver.invoke();
} else {
warn!("attempted to instantiate empty pipe");
}
}
pub(super) fn scheduler(&self) -> Arc<dyn Scheduler> {
self.next.read().unwrap().owner().unwrap().scheduler()
}
pub(super) fn make_observer<A>(&self) -> Arc<Observable<A>>
where
A: ObservableType,
{
let guard = self.next.read().unwrap();
Observable::new(
id(),
Some(guard.owner().unwrap().clone()),
replicate(guard.as_replicable()),
guard.owner().unwrap().scheduler(),
)
}
pub fn observe(&mut self) -> Arc<Observable<T>> {
let observable = self.make_observer();
let pipe = self.attach(
observable.clone(),
DispatchTarget::new(observable.clone(), Invoker::identity(observable.clone())),
);
pipe.instantiate();
observable
}
pub fn subscribe<F>(&mut self, consumer: F) -> Subscription
where
F: Fn(T) + Send + Sync + 'static,
{
let owner = self.next.write().unwrap().owner().unwrap();
let observable = Observable::<T>::new(
id(),
Some(owner.clone()),
Box::new(SubscriptionDispatcher::new(Invoker::new(Arc::new(move |x| {
consumer(x);
Signal::None
})))),
owner.scheduler(),
);
let pipe = self.attach(
observable.clone(),
DispatchTarget::new(observable.clone(), Invoker::identity(observable.clone())),
);
pipe.instantiate();
Subscription::new(Arc::downgrade(&observable) as Weak<dyn Owner>)
}
pub fn finalize<F>(&mut self, task: F) -> Pipe<T>
where
F: Fn() + Send + Sync + 'static,
{
self.next.read().unwrap().add_finalize(Task::new(Box::new(task)));
let observable = self.make_observer();
self.attach(
observable.clone(),
DispatchTarget::new(observable.clone(), Invoker::identity(observable)),
)
}
}
#[derive(PartialEq, Copy, Clone)]
pub(super) enum Signal {
None,
Recycle(usize),
}
impl Signal {
pub(super) fn is_none(&self) -> bool {
*self == Signal::None
}
}
pub struct Observable<T>
where
T: ObservableType,
{
id: usize,
pipeable: Pipeable<T>,
owner: Weak<dyn Owner>,
scheduler: Arc<dyn Scheduler>,
finished: AtomicBool,
}
impl<T> Owner for Observable<T>
where
T: ObservableType,
{
fn id(&self) -> usize {
self.id
}
fn finish(&self) {
if self
.finished
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.pipeable.read().unwrap().finalize();
}
}
fn finished(&self) -> bool {
self.finished.load(Ordering::Relaxed)
}
fn scheduler(&self) -> Arc<dyn Scheduler> {
self.scheduler.clone()
}
fn add_finalize(&self, task: Task) {
self.pipeable.read().unwrap().add_finalize(task);
}
fn handle(&self, signal: Signal) {
match signal {
Signal::None => (),
Signal::Recycle(id) => {
{
let child = self.pipeable.write().unwrap().remove_child(id);
if let Some(child) = child {
child.finish();
}
}
if self.num_children() == 0 {
self.finish();
}
}
}
}
fn owner(&self) -> Option<Arc<dyn Owner>> {
self.owner.upgrade()
}
fn initialize(&self) {
self.pipeable.read().unwrap().initialize();
}
}
impl<T> Drop for Observable<T>
where
T: ObservableType,
{
fn drop(&mut self) {
self.finish();
}
}
pub(super) fn id() -> usize {
static mut ID: AtomicUsize = AtomicUsize::new(0);
unsafe { ID.fetch_add(1, Ordering::Relaxed) }
}
enum ObservableStrategy<T>
where
T: ObservableType,
{
Of(Vec<T>),
Merge(Vec<Arc<Observable<T>>>),
}
pub struct ObservableBuilder<T>
where
T: ObservableType,
{
scheduler: SchedulerType,
dispatcher: DispatcherType,
strategy: ObservableStrategy<T>,
}
impl<T> ObservableBuilder<T>
where
T: ObservableType,
{
pub fn of(list: Vec<T>) -> Self {
ObservableBuilder {
scheduler: SchedulerType::Worker,
dispatcher: DispatcherType::Replay(list.len()),
strategy: ObservableStrategy::Of(list),
}
}
pub fn merge(list: Vec<Arc<Observable<T>>>) -> Self {
ObservableBuilder {
scheduler: SchedulerType::Worker,
dispatcher: DispatcherType::Basic,
strategy: ObservableStrategy::Merge(list),
}
}
pub fn scheduler(mut self, scheduler: SchedulerType) -> Self {
self.scheduler = scheduler;
self
}
pub fn dispatcher(mut self, dispatcher: DispatcherType) -> Self {
self.dispatcher = dispatcher;
self
}
pub fn build(self) -> Arc<Observable<T>> {
let id = id();
let scheduler = make_scheduler("observable".to_owned(), id, self.scheduler);
let dispatcher = super::dispatcher::create(self.dispatcher);
match self.strategy {
ObservableStrategy::Of(list) => {
let observable = Observable::new(id, None, dispatcher, scheduler.clone());
let cloned = observable.clone();
scheduler.execute(Task::new(move || {
for x in list.iter() {
cloned.next(x.clone());
}
}));
observable
}
ObservableStrategy::Merge(owners) => Funnel::new(owners, id, scheduler, dispatcher).1,
}
}
}
impl<T> Observable<T>
where
T: ObservableType,
{
pub(super) fn new(
id: usize,
owner: Option<Arc<dyn Owner>>,
dispatcher: Box<dyn Dispatcher<T>>,
scheduler: Arc<dyn Scheduler>,
) -> Arc<Self> {
let observable = Arc::new(Observable {
id,
pipeable: Arc::new(RwLock::new(dispatcher)),
owner: match owner.clone() {
Some(owner) => Arc::downgrade(&owner),
None => Weak::<DummyOwner>::new(),
},
scheduler,
finished: AtomicBool::new(false),
});
observable
.pipeable
.write()
.unwrap()
.set_owner(observable.clone() as Arc<dyn Owner>);
observable
}
pub fn merge(list: Vec<Arc<Observable<T>>>) -> Arc<Self> {
ObservableBuilder::merge(list).build()
}
pub fn of(list: Vec<T>) -> Arc<Self> {
ObservableBuilder::of(list).build()
}
pub fn pipe(&self) -> Pipe<T> {
Pipe::new(Some(self.pipeable.clone()), self.pipeable.clone(), None)
}
pub fn num_children(&self) -> usize {
self.pipeable.read().unwrap().num_children()
}
pub fn unsubscribe(&self) {
self.finish();
let owner = self.owner();
let id = self.id();
crate::sync::runtime::Runtime::submit(move || {
if let Some(owner) = owner.clone() {
owner.handle(Signal::Recycle(id));
}
});
}
pub(super) fn next(&self, value: T) {
if !self.finished() {
let signals = self.pipeable.read().unwrap().dispatch(value);
if let Some(signals) = signals {
for signal in signals.iter() {
self.handle(*signal);
}
}
}
}
pub(super) fn finished(&self) -> bool {
self.finished.load(Ordering::Relaxed)
}
}
pub struct Funnel<T>
where
T: ObservableType,
{
id: usize,
owners: Vec<Weak<dyn Owner>>,
target: RwLock<Option<Arc<Observable<T>>>>,
finished: AtomicBool,
}
impl<T> Owner for Funnel<T>
where
T: ObservableType,
{
fn id(&self) -> usize {
self.id
}
fn finish(&self) {
for owner in self.owners.iter() {
if let Some(owner) = owner.upgrade() {
owner.handle(Signal::Recycle(self.id()));
}
}
}
fn finished(&self) -> bool {
self.finished.load(Ordering::Relaxed)
}
fn scheduler(&self) -> Arc<dyn Scheduler> {
if let Some(owner) = self.owner() {
owner.scheduler()
} else {
Arc::new(super::scheduler::Blocking {})
}
}
fn add_finalize(&self, _task: Task) {
unreachable!();
}
fn handle(&self, signal: Signal) {
if let Signal::Recycle(_) = signal {
self.recycle(None);
}
}
fn owner(&self) -> Option<Arc<dyn Owner>> {
self.owners.iter().map(|x| x.upgrade()).find(|x| x.is_some())?
}
}
impl<T> Funnel<T>
where
T: ObservableType,
{
pub(super) fn new(
owners: Vec<Arc<Observable<T>>>,
child_id: usize,
scheduler: Arc<dyn Scheduler>,
dispatcher: Box<dyn Dispatcher<T>>,
) -> (Arc<Self>, Arc<Observable<T>>) {
let downgrade = owners.iter().map(|x| Arc::downgrade(&x) as Weak<dyn Owner>).collect();
let funnel = Arc::new(Funnel {
id: id(),
owners: downgrade,
target: RwLock::new(None),
finished: AtomicBool::new(false),
});
let observable = Observable::new(child_id, Some(funnel.clone()), dispatcher, scheduler);
*funnel.target.write().unwrap() = Some(observable.clone());
for owner in owners.iter() {
let capture = funnel.clone();
let owner_weak = Arc::downgrade(owner);
owner.pipeable.write().unwrap().add_child(DispatchTarget::new(
funnel.clone(),
Invoker::new(Arc::new(move |x| {
capture.next(x, owner_weak.clone());
Signal::None
})),
));
}
(funnel, observable)
}
pub(super) fn next(&self, value: T, caller: Weak<dyn Owner>) {
if !self.finished() {
self.target.read().unwrap().as_ref().unwrap().next(value);
if self.target.read().unwrap().as_ref().unwrap().finished() {
self.recycle(Some(caller));
}
}
}
pub(super) fn recycle(&self, caller: Option<Weak<dyn Owner>>) {
self.finish();
let id = caller
.map(|caller| caller.upgrade())
.unwrap_or(None)
.map(|caller| caller.id());
for owner in self.owners.iter() {
if let Some(owner) = owner.upgrade() {
let not_same = id.map(|id| id != owner.id());
if not_same.is_none() || not_same.unwrap() {
owner.handle(Signal::Recycle(self.id()));
}
}
}
}
}
#[cfg(test)]
pub mod testing {
use super::*;
use crate::event::dispatcher::{create, DispatcherType};
use crate::event::scheduler::{make_scheduler, SchedulerType};
pub fn mock_observable<T>(strategy: SchedulerType, dispatcher: DispatcherType) -> Arc<Observable<T>>
where
T: ObservableType,
{
let id = id();
let scheduler = make_scheduler("observable".to_owned(), id.clone(), strategy);
Observable::new(id, None, create(dispatcher), scheduler)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::event::dispatcher::DispatcherType;
#[test]
fn id_unique_test() {
for _ in 0..100 {
assert_ne!(id(), id());
}
}
#[test]
fn observable_of_test() {
let observable = Observable::of(vec![1, 2, 3]);
assert_eq!(
observable.pipeable.read().unwrap().get_type(),
DispatcherType::Replay(3)
);
}
#[test]
fn funnel_new_test() {
crate::utils::testing::async_context(|| {
let (funnel, _into) = {
let observables: Vec<Arc<Observable<()>>> =
vec![Observable::of(vec![]), Observable::of(vec![]), Observable::of(vec![])];
let (funnel, into) = {
let (funnel, observable) = Funnel::new(
observables.clone(),
id(),
Arc::new(crate::event::scheduler::Blocking {}),
crate::event::dispatcher::create(DispatcherType::Basic),
);
assert_eq!(funnel.owners.len(), 3);
(Arc::downgrade(&funnel), observable)
};
for observer in observables.iter() {
assert_eq!(observer.num_children(), 1);
}
assert!(funnel.upgrade().is_some());
(funnel, into)
};
std::thread::sleep(std::time::Duration::from_millis(20));
assert!(funnel.upgrade().is_none());
});
}
#[test]
fn funnel_target_unsubscribe_test() {
crate::utils::testing::async_context(|| {
let observables: Vec<Arc<Observable<()>>> =
vec![Observable::of(vec![]), Observable::of(vec![]), Observable::of(vec![])];
let (funnel, into) = {
let (funnel, observable) = Funnel::new(
observables.clone(),
id(),
Arc::new(crate::event::scheduler::Blocking {}),
crate::event::dispatcher::create(DispatcherType::Basic),
);
assert_eq!(funnel.owners.len(), 3);
(Arc::downgrade(&funnel), observable)
};
for observer in observables.iter() {
assert_eq!(observer.num_children(), 1);
}
assert!(funnel.upgrade().is_some());
into.unsubscribe();
while !crate::sync::runtime::Runtime::done() {}
assert!(funnel.upgrade().is_none());
for observer in observables.iter() {
assert_eq!(observer.num_children(), 0);
}
});
}
}