1use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::point::Point;
11use crate::simulation::event::*;
12use crate::simulation::ref_comp::RefComp;
13
14use dvcompute_utils::grc::Grc;
15
16pub mod disposable;
18
19pub mod observer;
21
22pub mod source;
24
25pub mod random;
27
28pub mod history;
30
31pub mod ops;
33
34use self::disposable::*;
35use self::observer::*;
36use self::source::*;
37
38#[inline]
48pub fn empty_observable<T>() -> Empty<T> {
49 Empty { phantom: PhantomData }
50}
51
52#[inline]
54pub fn delay_observable<F, M>(f: F) -> Delay<F, M>
55 where F: FnOnce() -> M + 'static,
56 M: Observable + 'static
57{
58 Delay { f: f, _phantom: PhantomData }
59}
60
61#[inline]
64pub fn new_observable_source_in_integ_times() -> NewSourceInIntegTimes {
65 NewSourceInIntegTimes {}
66}
67
68#[inline]
71pub fn new_observable_in_start_time() -> NewInStartTime {
72 NewInStartTime {}
73}
74
75#[inline]
78pub fn new_observable_source_in_start_time() -> NewSourceInStartTime {
79 NewSourceInStartTime {}
80}
81
82#[inline]
85pub fn new_observable_in_stop_time() -> NewInStopTime {
86 NewInStopTime {}
87}
88
89#[inline]
92pub fn new_observable_source_in_stop_time() -> NewSourceInStopTime {
93 NewSourceInStopTime {}
94}
95
96#[inline]
98pub fn trace_observable<M>(msg: String, comp: M) -> Trace<M>
99 where M: Observable
100{
101 Trace { comp: comp, msg: msg}
102}
103
104pub trait Observable {
106
107 type Message;
109
110 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
112 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static;
113
114 #[inline]
116 fn map<B, F>(self, f: F) -> Map<Self, B, F>
117 where Self: Sized,
118 F: Fn(&Self::Message) -> B + Clone + 'static,
119 Self::Message: Clone + 'static,
120 B: 'static
121 {
122 Map { comp: self, f: f, phantom: PhantomData }
123 }
124
125 #[inline]
127 fn mapc<U, B, F>(self, f: F) -> MapC<Self, U, B, F>
128 where Self: Sized,
129 F: Fn(&Self::Message) -> U + Clone + 'static,
130 U: Event<Item = B>,
131 Self::Message: Clone + 'static,
132 B: 'static
133 {
134 MapC { comp: self, f: f, _phantom1: PhantomData, _phantom2: PhantomData }
135 }
136
137 #[inline]
139 fn filter<F>(self, f: F) -> Filter<Self, F>
140 where Self: Sized,
141 F: Fn(&Self::Message) -> bool + Clone + 'static,
142 Self::Message: Clone + 'static
143 {
144 Filter { comp: self, f: f }
145 }
146
147 #[inline]
149 fn filterc<U, F>(self, f: F) -> FilterC<Self, U, F>
150 where Self: Sized,
151 F: Fn(&Self::Message) -> U + Clone + 'static,
152 U: Event<Item = bool>,
153 Self::Message: Clone + 'static
154 {
155 FilterC { comp: self, f: f, _phantom: PhantomData }
156 }
157
158 #[inline]
160 fn hold(self, dt: f64) -> Hold<Self>
161 where Self: Sized,
162 Self::Message: Clone + 'static
163 {
164 Hold { comp: self, dt: dt }
165 }
166
167 #[inline]
169 fn holdc<U, F>(self, f: F) -> HoldC<Self, U, F>
170 where Self: Sized,
171 Self::Message: Clone + 'static,
172 F: Fn() -> U + Clone + 'static,
173 U: Event<Item = f64>
174 {
175 HoldC { comp: self, f: f, _phantom: PhantomData }
176 }
177
178 #[inline]
180 fn merge<U>(self, other: U) -> Merge<Self, U>
181 where Self: Sized,
182 Self::Message: Clone + 'static,
183 U: Observable<Message = Self::Message>
184 {
185 Merge { comp: self, other: other }
186 }
187
188 #[inline]
190 fn into_boxed(self) -> ObservableBox<Self::Message>
191 where Self: Sized + Clone + 'static
192 {
193 ObservableBox::new(move |observer: ObserverBox<Self::Message, ()>| {
194 self.subscribe(observer.into_observer())
195 })
196 }
197}
198
199pub trait IntoObservable {
201
202 type Observable: Observable<Message = Self::Message>;
204
205 type Message;
207
208 fn into_observable(self) -> Self::Observable;
210}
211
212impl<M: Observable> IntoObservable for M {
213
214 type Observable = M;
215
216 type Message = M::Message;
217
218 #[inline]
219 fn into_observable(self) -> Self::Observable {
220 self
221 }
222}
223
224#[must_use = "computations are lazy and do nothing unless to be run"]
226pub struct ObservableBox<T> {
227 f: Box<dyn ObservableFnBoxClone<T>>
228}
229
230impl<T> ObservableBox<T> {
231
232 #[doc(hidden)]
234 #[inline]
235 fn new<F>(f: F) -> Self
236 where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox> + Clone + 'static,
237 T: 'static
238 {
239 ObservableBox {
240 f: Box::new(f)
241 }
242 }
243
244 #[doc(hidden)]
246 #[inline]
247 pub fn call_box(self, arg: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
248 let ObservableBox { f } = self;
249 f.call_box(arg)
250 }
251}
252
253impl<T> Clone for ObservableBox<T> {
254
255 #[inline]
256 fn clone(&self) -> Self {
257 ObservableBox {
258 f: self.f.call_clone()
259 }
260 }
261}
262
263impl<T> Observable for ObservableBox<T> where T: 'static {
264
265 type Message = T;
266
267 #[inline]
268 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
269 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
270 {
271 self.call_box((observer.into_boxed(),))
272 }
273
274 #[inline]
275 fn into_boxed(self) -> ObservableBox<Self::Message>
276 where Self: Sized + Clone + 'static
277 {
278 self
279 }
280}
281
282trait ObservableFnBox<T> {
284
285 fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox>;
287}
288
289impl<T, F> ObservableFnBox<T> for F
290 where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox>,
291 T: 'static
292{
293 fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
294 let this: Self = *self;
295 this(args.0)
296 }
297}
298
299trait ObservableFnBoxClone<T>: ObservableFnBox<T> {
301
302 fn call_clone(&self) -> Box<dyn ObservableFnBoxClone<T>>;
304}
305
306impl<T, F> ObservableFnBoxClone<T> for F
307 where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox> + Clone + 'static,
308 T: 'static
309{
310 fn call_clone(&self) -> Box<dyn ObservableFnBoxClone<T>> {
311 Box::new(self.clone())
312 }
313}
314
315#[must_use = "computations are lazy and do nothing unless to be run"]
317#[derive(Clone)]
318pub struct Map<M, B, F> {
319
320 comp: M,
322
323 f: F,
325
326 phantom: PhantomData<B>
328}
329
330impl<M, B, F> Observable for Map<M, B, F>
331 where M: Observable,
332 F: Fn(&M::Message) -> B + Clone + 'static,
333 M::Message: Clone + 'static,
334 B: 'static
335{
336 type Message = B;
337
338 #[inline]
339 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
340 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
341 {
342 let comp = self.comp;
343 let f = self.f;
344 let observer = cons_observer(move |m: &M::Message, p: &Point| {
345 let m2 = f(m);
346 observer.call_observer(&m2, p)
347 });
348
349 comp.subscribe(observer)
350 }
351}
352
353#[must_use = "computations are lazy and do nothing unless to be run"]
355#[derive(Clone)]
356pub struct MapC<M, U, B, F> {
357
358 comp: M,
360
361 f: F,
363
364 _phantom1: PhantomData<U>,
366
367 _phantom2: PhantomData<B>
369}
370
371impl<M, U, B, F> Observable for MapC<M, U, B, F>
372 where M: Observable,
373 F: Fn(&M::Message) -> U + Clone + 'static,
374 U: Event<Item = B>,
375 M::Message: Clone + 'static,
376 B: 'static
377{
378 type Message = B;
379
380 #[inline]
381 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
382 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
383 {
384 let comp = self.comp;
385 let f = self.f;
386 let observer = cons_observer(move |m: &M::Message, p: &Point| {
387 match f(m).call_event(p) {
388 Result::Err(e) => Result::Err(e),
389 Result::Ok(m2) => observer.call_observer(&m2, p)
390 }
391 });
392
393 comp.subscribe(observer)
394 }
395}
396
397#[must_use = "computations are lazy and do nothing unless to be run"]
399#[derive(Clone)]
400pub struct Filter<M, F> {
401
402 comp: M,
404
405 f: F
407}
408
409impl<M, F> Observable for Filter<M, F>
410 where M: Observable,
411 F: Fn(&M::Message) -> bool + Clone + 'static,
412 M::Message: Clone + 'static
413{
414 type Message = M::Message;
415
416 #[inline]
417 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
418 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
419 {
420 let comp = self.comp;
421 let f = self.f;
422 let observer = cons_observer(move |m: &M::Message, p: &Point| {
423 if f(m) {
424 observer.call_observer(m, p)
425 } else {
426 Result::Ok(())
427 }
428 });
429
430 comp.subscribe(observer)
431 }
432}
433
434#[must_use = "computations are lazy and do nothing unless to be run"]
436#[derive(Clone)]
437pub struct FilterC<M, U, F> {
438
439 comp: M,
441
442 f: F,
444
445 _phantom: PhantomData<U>
447}
448
449impl<M, U, F> Observable for FilterC<M, U, F>
450 where M: Observable,
451 F: Fn(&M::Message) -> U + Clone + 'static,
452 U: Event<Item = bool>,
453 M::Message: Clone + 'static
454{
455 type Message = M::Message;
456
457 #[inline]
458 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
459 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
460 {
461 let comp = self.comp;
462 let f = self.f;
463 let observer = cons_observer(move |m: &M::Message, p: &Point| {
464 match f(m).call_event(p) {
465 Result::Err(e) => Result::Err(e),
466 Result::Ok(true) => observer.call_observer(m, p),
467 Result::Ok(false) => Result::Ok(())
468 }
469 });
470
471 comp.subscribe(observer)
472 }
473}
474
475#[must_use = "computations are lazy and do nothing unless to be run"]
477#[derive(Clone)]
478pub struct Hold<M> {
479
480 comp: M,
482
483 dt: f64
485}
486
487impl<M> Observable for Hold<M>
488 where M: Observable,
489 M::Message: Clone + 'static
490{
491 type Message = M::Message;
492
493 #[inline]
494 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
495 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
496 {
497 let comp = self.comp;
498 let dt = self.dt;
499 let r = Grc::new(RefComp::new(false));
500 let observer = {
501 let r = r.clone();
502 let observer = Grc::new(observer);
503 cons_observer(move |m: &M::Message, p: &Point| {
504 let m = m.clone();
505 let r = r.clone();
506 let observer = observer.clone();
507 let comp = cons_event(move |p| {
508 let x = r.read_at(p);
509 if !x {
510 observer.call_observer(&m, p)
511 } else {
512 Result::Ok(())
513 }
514 });
515 enqueue_event(p.time + dt, comp.into_boxed())
516 .call_event(p)
517 })
518 };
519 let h = comp.subscribe(observer);
520 let h = cons_disposable(move |p| {
521 r.write_at(true, p);
522 let h = h.call_event(p)?;
523 h.dispose(p)
524 }).into_boxed();
525 cons_event(move |_p| { Result::Ok(h) })
526 .into_boxed()
527 }
528}
529
530#[must_use = "computations are lazy and do nothing unless to be run"]
532#[derive(Clone)]
533pub struct HoldC<M, U, F> {
534
535 comp: M,
537
538 f: F,
540
541 _phantom: PhantomData<U>
543}
544
545impl<M, U, F> Observable for HoldC<M, U, F>
546 where M: Observable,
547 M::Message: Clone + 'static,
548 F: Fn() -> U + Clone + 'static,
549 U: Event<Item = f64>
550{
551 type Message = M::Message;
552
553 #[inline]
554 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
555 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
556 {
557 let comp = self.comp;
558 let f = self.f;
559 let r = Grc::new(RefComp::new(false));
560 let observer = {
561 let r = r.clone();
562 let observer = Grc::new(observer);
563 cons_observer(move |m: &M::Message, p: &Point| {
564 let m = m.clone();
565 let r = r.clone();
566 let observer = observer.clone();
567 let comp = cons_event(move |p| {
568 let x = r.read_at(p);
569 if !x {
570 observer.call_observer(&m, p)
571 } else {
572 Result::Ok(())
573 }
574 });
575 let dt = f();
576 let dt = dt.call_event(p)?;
577 enqueue_event(p.time + dt, comp.into_boxed())
578 .call_event(p)
579 })
580 };
581 let h = comp.subscribe(observer);
582 let h = cons_disposable(move |p| {
583 r.write_at(true, p);
584 let h = h.call_event(p)?;
585 h.dispose(p)
586 }).into_boxed();
587 cons_event(move |_p| { Result::Ok(h) })
588 .into_boxed()
589 }
590}
591
592#[must_use = "computations are lazy and do nothing unless to be run"]
594#[derive(Clone)]
595pub struct Merge<M, U> {
596
597 comp: M,
599
600 other: U,
602}
603
604impl<M, U> Observable for Merge<M, U>
605 where M: Observable,
606 M::Message: Clone + 'static,
607 U: Observable<Message = M::Message>
608{
609 type Message = M::Message;
610
611 #[inline]
612 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
613 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
614 {
615 let observer = Grc::new(observer);
616 let comp = self.comp;
617 let other = self.other;
618 let observer1 = {
619 let observer = observer.clone();
620 cons_observer(move |m: &M::Message, p: &Point| {
621 observer.call_observer(m, p)
622 })
623 };
624 let observer2 = {
625 cons_observer(move |m: &M::Message, p: &Point| {
626 observer.call_observer(m, p)
627 })
628 };
629 let disposable1 = comp.subscribe(observer1);
630 let disposable2 = other.subscribe(observer2);
631
632 disposable1.into_event().and_then(move |disposable1| {
633 disposable2.into_event().map(move |disposable2| {
634 let disposable1 = disposable1.into_disposable();
635 let disposable2 = disposable2.into_disposable();
636
637 disposable1.merge(disposable2).into_boxed()
638 })
639 }).into_boxed()
640 }
641}
642
643#[must_use = "computations are lazy and do nothing unless to be run"]
645#[derive(Clone)]
646pub struct Empty<T> {
647
648 phantom: PhantomData<T>
650}
651
652impl<T> Observable for Empty<T> {
653
654 type Message = T;
655
656 #[inline]
657 fn subscribe<O>(self, _observer: O) -> EventBox<DisposableBox>
658 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
659 {
660 let disposable = empty_disposable().into_boxed();
661 return_event(disposable).into_boxed()
662 }
663}
664
665#[must_use = "computations are lazy and do nothing unless to be run"]
667#[derive(Clone)]
668pub struct Delay<F, M> {
669
670 f: F,
672
673 _phantom: PhantomData<M>
675}
676
677impl<F, M> Observable for Delay<F, M>
678 where F: FnOnce() -> M + 'static,
679 M: Observable + 'static
680{
681 type Message = M::Message;
682
683 #[inline]
684 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
685 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
686 {
687 let Delay { f, _phantom } = self;
688 f().subscribe(observer)
689 }
690}
691
692#[must_use = "computations are lazy and do nothing unless to be run"]
694#[derive(Clone)]
695pub struct NewSourceInIntegTimes {}
696
697impl Event for NewSourceInIntegTimes {
698
699 type Item = Grc<ObservableSource<f64>>;
700
701 #[doc(hidden)]
702 #[inline]
703 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
704 let source = Grc::new(source::ObservableSource::new());
705 let source2 = source.clone();
706 enqueue_events_with_integ_times(move || {
707 let source = source.clone();
708 cons_event(move |p| {
709 source.trigger_at(&p.time, p)
710 }).into_boxed()
711 }).call_event(p)?;
712 Result::Ok(source2)
713 }
714}
715
716#[must_use = "computations are lazy and do nothing unless to be run"]
718#[derive(Clone)]
719pub struct NewSourceInStartTime {}
720
721impl Event for NewSourceInStartTime {
722
723 type Item = Grc<ObservableSource<f64>>;
724
725 #[doc(hidden)]
726 #[inline]
727 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
728 let source = Grc::new(source::ObservableSource::new());
729 let source2 = source.clone();
730 enqueue_event(p.run.specs.start_time, {
731 cons_event(move |p| {
732 source.trigger_at(&p.time, p)
733 }).into_boxed()
734 }).call_event(p)?;
735 Result::Ok(source2)
736 }
737}
738
739#[must_use = "computations are lazy and do nothing unless to be run"]
741#[derive(Clone)]
742pub struct NewSourceInStopTime {}
743
744impl Event for NewSourceInStopTime {
745
746 type Item = Grc<ObservableSource<f64>>;
747
748 #[doc(hidden)]
749 #[inline]
750 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
751 let source = Grc::new(source::ObservableSource::new());
752 let source2 = source.clone();
753 enqueue_event(p.run.specs.stop_time, {
754 cons_event(move |p| {
755 source.trigger_at(&p.time, p)
756 }).into_boxed()
757 }).call_event(p)?;
758 Result::Ok(source2)
759 }
760}
761
762#[must_use = "computations are lazy and do nothing unless to be run"]
764#[derive(Clone)]
765pub struct NewInIntegTimes {}
766
767impl Event for NewInIntegTimes {
768
769 type Item = ObservableBox<f64>;
770
771 #[doc(hidden)]
772 #[inline]
773 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
774 let comp = NewSourceInIntegTimes {};
775 let source = comp.call_event(p)?;
776 Result::Ok(source.publish().into_boxed())
777 }
778}
779
780#[must_use = "computations are lazy and do nothing unless to be run"]
782#[derive(Clone)]
783pub struct NewInStartTime {}
784
785impl Event for NewInStartTime {
786
787 type Item = ObservableBox<f64>;
788
789 #[doc(hidden)]
790 #[inline]
791 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
792 let comp = NewSourceInStartTime {};
793 let source = comp.call_event(p)?;
794 Result::Ok(source.publish().into_boxed())
795 }
796}
797
798#[must_use = "computations are lazy and do nothing unless to be run"]
800#[derive(Clone)]
801pub struct NewInStopTime {}
802
803impl Event for NewInStopTime {
804
805 type Item = ObservableBox<f64>;
806
807 #[doc(hidden)]
808 #[inline]
809 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
810 let comp = NewSourceInStopTime {};
811 let source = comp.call_event(p)?;
812 Result::Ok(source.publish().into_boxed())
813 }
814}
815
816#[must_use = "computations are lazy and do nothing unless to be run"]
818#[derive(Clone)]
819pub struct Trace<M> {
820
821 comp: M,
823
824 msg: String
826}
827
828impl<M> Observable for Trace<M>
829 where M: Observable
830{
831 type Message = M::Message;
832
833 #[doc(hidden)]
834 #[inline]
835 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
836 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
837 {
838 let Trace { comp, msg } = self;
839 comp.subscribe(trace_observer(msg, observer))
840 }
841}