use std::marker::PhantomData;
use crate::simulation;
use crate::simulation::point::Point;
use crate::simulation::event::*;
use crate::simulation::ref_comp::RefComp;
use dvcompute_utils::grc::Grc;
pub mod disposable;
pub mod observer;
pub mod source;
pub mod random;
pub mod history;
pub mod ops;
use self::disposable::*;
use self::observer::*;
use self::source::*;
#[inline]
pub fn empty_observable<T>() -> Empty<T> {
Empty { phantom: PhantomData }
}
#[inline]
pub fn delay_observable<F, M>(f: F) -> Delay<F, M>
where F: FnOnce() -> M + 'static,
M: Observable + 'static
{
Delay { f: f, _phantom: PhantomData }
}
#[inline]
pub fn new_observable_in_integ_times() -> NewInIntegTimes {
NewInIntegTimes {}
}
#[inline]
pub fn new_observable_source_in_integ_times() -> NewSourceInIntegTimes {
NewSourceInIntegTimes {}
}
#[inline]
pub fn new_observable_in_start_time() -> NewInStartTime {
NewInStartTime {}
}
#[inline]
pub fn new_observable_source_in_start_time() -> NewSourceInStartTime {
NewSourceInStartTime {}
}
#[inline]
pub fn new_observable_in_stop_time() -> NewInStopTime {
NewInStopTime {}
}
#[inline]
pub fn new_observable_source_in_stop_time() -> NewSourceInStopTime {
NewSourceInStopTime {}
}
#[inline]
pub fn trace_observable<M>(msg: String, comp: M) -> Trace<M>
where M: Observable
{
Trace { comp: comp, msg: msg}
}
pub trait Observable {
type Message;
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static;
#[inline]
fn map<B, F>(self, f: F) -> Map<Self, B, F>
where Self: Sized,
F: Fn(&Self::Message) -> B + 'static,
Self::Message: 'static,
B: 'static
{
Map { comp: self, f: f, phantom: PhantomData }
}
#[inline]
fn mapc<U, B, F>(self, f: F) -> MapC<Self, U, B, F>
where Self: Sized,
F: Fn(&Self::Message) -> U + 'static,
U: Event<Item = B>,
Self::Message: 'static,
B: 'static
{
MapC { comp: self, f: f, _phantom1: PhantomData, _phantom2: PhantomData }
}
#[inline]
fn filter<F>(self, f: F) -> Filter<Self, F>
where Self: Sized,
F: Fn(&Self::Message) -> bool + 'static,
Self::Message: 'static
{
Filter { comp: self, f: f }
}
#[inline]
fn filterc<U, F>(self, f: F) -> FilterC<Self, U, F>
where Self: Sized,
F: Fn(&Self::Message) -> U + 'static,
U: Event<Item = bool>,
Self::Message: 'static
{
FilterC { comp: self, f: f, _phantom: PhantomData }
}
#[inline]
fn hold(self, dt: f64) -> Hold<Self>
where Self: Sized,
Self::Message: Clone + 'static
{
Hold { comp: self, dt: dt }
}
#[inline]
fn holdc<U, F>(self, f: F) -> HoldC<Self, U, F>
where Self: Sized,
Self::Message: Clone + 'static,
F: Fn() -> U + 'static,
U: Event<Item = f64>
{
HoldC { comp: self, f: f, _phantom: PhantomData }
}
#[inline]
fn merge<U>(self, other: U) -> Merge<Self, U>
where Self: Sized,
Self::Message: 'static,
U: Observable<Message = Self::Message>
{
Merge { comp: self, other: other }
}
#[inline]
fn into_boxed(self) -> ObservableBox<Self::Message>
where Self: Sized + 'static
{
ObservableBox::new(move |observer: ObserverBox<Self::Message, ()>| {
self.subscribe(observer.into_observer())
})
}
}
pub trait IntoObservable {
type Observable: Observable<Message = Self::Message>;
type Message;
fn into_observable(self) -> Self::Observable;
}
impl<M: Observable> IntoObservable for M {
type Observable = M;
type Message = M::Message;
#[inline]
fn into_observable(self) -> Self::Observable {
self
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
pub struct ObservableBox<T> {
f: Box<dyn ObservableFnBox<T>>
}
impl<T> ObservableBox<T> {
#[doc(hidden)]
#[inline]
fn new<F>(f: F) -> Self
where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox> + 'static
{
ObservableBox {
f: Box::new(f)
}
}
#[doc(hidden)]
#[inline]
pub fn call_box(self, arg: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
let ObservableBox { f } = self;
f.call_box(arg)
}
}
impl<T> Observable for ObservableBox<T> {
type Message = T;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
self.call_box((observer.into_boxed(),))
}
#[inline]
fn into_boxed(self) -> ObservableBox<Self::Message>
where Self: Sized + 'static
{
self
}
}
trait ObservableFnBox<T> {
fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox>;
}
impl<T, F> ObservableFnBox<T> for F
where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox>
{
fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
let this: Self = *self;
this(args.0)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Map<M, B, F> {
comp: M,
f: F,
phantom: PhantomData<B>
}
impl<M, B, F> Observable for Map<M, B, F>
where M: Observable,
F: Fn(&M::Message) -> B + 'static,
M::Message: 'static,
B: 'static
{
type Message = B;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let f = self.f;
let observer = cons_observer(move |m: &M::Message, p: &Point| {
let m2 = f(m);
observer.call_observer(&m2, p)
});
comp.subscribe(observer)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct MapC<M, U, B, F> {
comp: M,
f: F,
_phantom1: PhantomData<U>,
_phantom2: PhantomData<B>
}
impl<M, U, B, F> Observable for MapC<M, U, B, F>
where M: Observable,
F: Fn(&M::Message) -> U + 'static,
U: Event<Item = B>,
M::Message: 'static,
B: 'static
{
type Message = B;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let f = self.f;
let observer = cons_observer(move |m: &M::Message, p: &Point| {
match f(m).call_event(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(m2) => observer.call_observer(&m2, p)
}
});
comp.subscribe(observer)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Filter<M, F> {
comp: M,
f: F
}
impl<M, F> Observable for Filter<M, F>
where M: Observable,
F: Fn(&M::Message) -> bool + 'static,
M::Message: 'static
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let f = self.f;
let observer = cons_observer(move |m: &M::Message, p: &Point| {
if f(m) {
observer.call_observer(m, p)
} else {
Result::Ok(())
}
});
comp.subscribe(observer)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct FilterC<M, U, F> {
comp: M,
f: F,
_phantom: PhantomData<U>
}
impl<M, U, F> Observable for FilterC<M, U, F>
where M: Observable,
F: Fn(&M::Message) -> U + 'static,
U: Event<Item = bool>,
M::Message: 'static
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let f = self.f;
let observer = cons_observer(move |m: &M::Message, p: &Point| {
match f(m).call_event(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(true) => observer.call_observer(m, p),
Result::Ok(false) => Result::Ok(())
}
});
comp.subscribe(observer)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Hold<M> {
comp: M,
dt: f64
}
impl<M> Observable for Hold<M>
where M: Observable,
M::Message: Clone + 'static
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let dt = self.dt;
let r = Grc::new(RefComp::new(false));
let observer = {
let r = r.clone();
let observer = Grc::new(observer);
cons_observer(move |m: &M::Message, p: &Point| {
let m = m.clone();
let r = r.clone();
let observer = observer.clone();
let comp = cons_event(move |p| {
let x = r.read_at(p);
if !x {
observer.call_observer(&m, p)
} else {
Result::Ok(())
}
});
enqueue_event(p.time + dt, comp.into_boxed())
.call_event(p)
})
};
let h = comp.subscribe(observer);
let h = cons_disposable(move |p| {
r.write_at(true, p);
let h = h.call_event(p)?;
h.dispose(p)
}).into_boxed();
cons_event(move |_p| { Result::Ok(h) })
.into_boxed()
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct HoldC<M, U, F> {
comp: M,
f: F,
_phantom: PhantomData<U>
}
impl<M, U, F> Observable for HoldC<M, U, F>
where M: Observable,
M::Message: Clone + 'static,
F: Fn() -> U + 'static,
U: Event<Item = f64>
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let comp = self.comp;
let f = self.f;
let r = Grc::new(RefComp::new(false));
let observer = {
let r = r.clone();
let observer = Grc::new(observer);
cons_observer(move |m: &M::Message, p: &Point| {
let m = m.clone();
let r = r.clone();
let observer = observer.clone();
let comp = cons_event(move |p| {
let x = r.read_at(p);
if !x {
observer.call_observer(&m, p)
} else {
Result::Ok(())
}
});
let dt = f();
let dt = dt.call_event(p)?;
enqueue_event(p.time + dt, comp.into_boxed())
.call_event(p)
})
};
let h = comp.subscribe(observer);
let h = cons_disposable(move |p| {
r.write_at(true, p);
let h = h.call_event(p)?;
h.dispose(p)
}).into_boxed();
cons_event(move |_p| { Result::Ok(h) })
.into_boxed()
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Merge<M, U> {
comp: M,
other: U,
}
impl<M, U> Observable for Merge<M, U>
where M: Observable,
M::Message: 'static,
U: Observable<Message = M::Message>
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let observer = Grc::new(observer);
let comp = self.comp;
let other = self.other;
let observer1 = {
let observer = observer.clone();
cons_observer(move |m: &M::Message, p: &Point| {
observer.call_observer(m, p)
})
};
let observer2 = {
cons_observer(move |m: &M::Message, p: &Point| {
observer.call_observer(m, p)
})
};
let disposable1 = comp.subscribe(observer1);
let disposable2 = other.subscribe(observer2);
disposable1.into_event().and_then(move |disposable1| {
disposable2.into_event().map(move |disposable2| {
let disposable1 = disposable1.into_disposable();
let disposable2 = disposable2.into_disposable();
disposable1.merge(disposable2).into_boxed()
})
}).into_boxed()
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Empty<T> {
phantom: PhantomData<T>
}
impl<T> Observable for Empty<T> {
type Message = T;
#[inline]
fn subscribe<O>(self, _observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let disposable = empty_disposable().into_boxed();
return_event(disposable).into_boxed()
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Delay<F, M> {
f: F,
_phantom: PhantomData<M>
}
impl<F, M> Observable for Delay<F, M>
where F: FnOnce() -> M + 'static,
M: Observable + 'static
{
type Message = M::Message;
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let Delay { f, _phantom } = self;
f().subscribe(observer)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewSourceInIntegTimes {}
impl Event for NewSourceInIntegTimes {
type Item = Grc<ObservableSource<f64>>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let source = Grc::new(source::ObservableSource::new());
let source2 = source.clone();
enqueue_events_with_integ_times(move || {
let source = source.clone();
cons_event(move |p| {
source.trigger_at(&p.time, p)
}).into_boxed()
}).call_event(p)?;
Result::Ok(source2)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewSourceInStartTime {}
impl Event for NewSourceInStartTime {
type Item = Grc<ObservableSource<f64>>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let source = Grc::new(source::ObservableSource::new());
let source2 = source.clone();
enqueue_event(p.run.specs.start_time, {
cons_event(move |p| {
source.trigger_at(&p.time, p)
}).into_boxed()
}).call_event(p)?;
Result::Ok(source2)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewSourceInStopTime {}
impl Event for NewSourceInStopTime {
type Item = Grc<ObservableSource<f64>>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let source = Grc::new(source::ObservableSource::new());
let source2 = source.clone();
enqueue_event(p.run.specs.stop_time, {
cons_event(move |p| {
source.trigger_at(&p.time, p)
}).into_boxed()
}).call_event(p)?;
Result::Ok(source2)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewInIntegTimes {}
impl Event for NewInIntegTimes {
type Item = ObservableBox<f64>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let comp = NewSourceInIntegTimes {};
let source = comp.call_event(p)?;
Result::Ok(source.publish().into_boxed())
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewInStartTime {}
impl Event for NewInStartTime {
type Item = ObservableBox<f64>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let comp = NewSourceInStartTime {};
let source = comp.call_event(p)?;
Result::Ok(source.publish().into_boxed())
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewInStopTime {}
impl Event for NewInStopTime {
type Item = ObservableBox<f64>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let comp = NewSourceInStopTime {};
let source = comp.call_event(p)?;
Result::Ok(source.publish().into_boxed())
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Trace<M> {
comp: M,
msg: String
}
impl<M> Observable for Trace<M>
where M: Observable
{
type Message = M::Message;
#[doc(hidden)]
#[inline]
fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
where O: Observer<Message = Self::Message, Item = ()> + 'static
{
let Trace { comp, msg } = self;
comp.subscribe(trace_observer(msg, observer))
}
}