dvcompute_branch/simulation/observable/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9use crate::simulation;
10use crate::simulation::point::Point;
11use crate::simulation::event::*;
12use crate::simulation::ref_comp::RefComp;
13
14use dvcompute_utils::grc::Grc;
15
16/// Defines the disposable computation.
17pub mod disposable;
18
19/// Defines the observer computation.
20pub mod observer;
21
22/// Defines the observable source.
23pub mod source;
24
25/// Random observable computations.
26pub mod random;
27
28/// Defines the signal history.
29pub mod history;
30
31/// Additional operations.
32pub mod ops;
33
34use self::disposable::*;
35use self::observer::*;
36use self::source::*;
37
38// /// Construct a new `Observable` computation by the specified function.
39// #[inline]
40// pub fn cons_observable<F, T>(f: F) -> Cons<F, T>
41//     where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox>
42// {
43//      Cons { f: f, phantom: PhantomData }
44// }
45
46/// Return an empty observable that emits no messages.
47#[inline]
48pub fn empty_observable<T>() -> Empty<T> {
49    Empty { phantom: PhantomData }
50}
51
52/// Delay the `Observable` computation.
53#[inline]
54pub fn delay_observable<F, M>(f: F) -> Delay<F, M>
55    where F: FnOnce() -> M + 'static,
56          M: Observable + 'static
57{
58    Delay { f: f, _phantom: PhantomData }
59}
60
61/// Return a source of signal that is triggered in the integration time points.
62/// It should be called in the start time only.
63#[inline]
64pub fn new_observable_source_in_integ_times() -> NewSourceInIntegTimes {
65    NewSourceInIntegTimes {}
66}
67
68/// Return a signal that is triggered in the start time point.
69/// It should be called in the start time only.
70#[inline]
71pub fn new_observable_in_start_time() -> NewInStartTime {
72    NewInStartTime {}
73}
74
75/// Return a source of signal that is triggered in the start time point.
76/// It should be called in the start time only.
77#[inline]
78pub fn new_observable_source_in_start_time() -> NewSourceInStartTime {
79    NewSourceInStartTime {}
80}
81
82/// Return a signal that is triggered in the stop time point.
83/// It should be called in the start time only.
84#[inline]
85pub fn new_observable_in_stop_time() -> NewInStopTime {
86    NewInStopTime {}
87}
88
89/// Return a source of signal that is triggered in the stop time point.
90/// It should be called in the start time only.
91#[inline]
92pub fn new_observable_source_in_stop_time() -> NewSourceInStopTime {
93    NewSourceInStopTime {}
94}
95
96/// Trace the computation.
97#[inline]
98pub fn trace_observable<M>(msg: String, comp: M) -> Trace<M>
99    where M: Observable
100{
101    Trace { comp: comp, msg: msg}
102}
103
104/// The computation that allows notifying about the events.
105pub trait Observable {
106
107    /// The type of messages to notify in the current modeling time.
108    type Message;
109
110    /// Subscribe the observer.
111    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
112        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static;
113
114    /// Map the current computation using the specified transform.
115    #[inline]
116    fn map<B, F>(self, f: F) -> Map<Self, B, F>
117        where Self: Sized,
118              F: Fn(&Self::Message) -> B + Clone + 'static,
119              Self::Message: Clone + 'static,
120              B: 'static
121    {
122        Map { comp: self, f: f, phantom: PhantomData }
123    }
124
125    /// Map the current computation using the specified transform computation.
126    #[inline]
127    fn mapc<U, B, F>(self, f: F) -> MapC<Self, U, B, F>
128        where Self: Sized,
129              F: Fn(&Self::Message) -> U + Clone + 'static,
130              U: Event<Item = B>,
131              Self::Message: Clone + 'static,
132              B: 'static
133    {
134        MapC { comp: self, f: f, _phantom1: PhantomData, _phantom2: PhantomData }
135    }
136
137    /// Filter the messages.
138    #[inline]
139    fn filter<F>(self, f: F) -> Filter<Self, F>
140        where Self: Sized,
141              F: Fn(&Self::Message) -> bool + Clone + 'static,
142              Self::Message: Clone + 'static
143    {
144        Filter { comp: self, f: f }
145    }
146
147    /// Filter the messages within computation.
148    #[inline]
149    fn filterc<U, F>(self, f: F) -> FilterC<Self, U, F>
150        where Self: Sized,
151              F: Fn(&Self::Message) -> U + Clone + 'static,
152              U: Event<Item = bool>,
153              Self::Message: Clone + 'static
154    {
155        FilterC { comp: self, f: f, _phantom: PhantomData }
156    }
157
158    /// Hold the receiving of messages for the specified time interval.
159    #[inline]
160    fn hold(self, dt: f64) -> Hold<Self>
161        where Self: Sized,
162              Self::Message: Clone + 'static
163    {
164        Hold { comp: self, dt: dt }
165    }
166
167    /// Hold the receiving of messages for intervals calculated within `Event` computation.
168    #[inline]
169    fn holdc<U, F>(self, f: F) -> HoldC<Self, U, F>
170        where Self: Sized,
171              Self::Message: Clone + 'static,
172              F: Fn() -> U + Clone + 'static,
173              U: Event<Item = f64>
174    {
175        HoldC { comp: self, f: f, _phantom: PhantomData }
176    }
177
178    /// Merge the current computation with another one within the resulting computation.
179    #[inline]
180    fn merge<U>(self, other: U) -> Merge<Self, U>
181        where Self: Sized,
182              Self::Message: Clone + 'static,
183              U: Observable<Message = Self::Message>
184    {
185        Merge { comp: self, other: other }
186    }
187
188    /// Convert into a boxed value.
189    #[inline]
190    fn into_boxed(self) -> ObservableBox<Self::Message>
191        where Self: Sized + Clone + 'static
192    {
193        ObservableBox::new(move |observer: ObserverBox<Self::Message, ()>| {
194            self.subscribe(observer.into_observer())
195        })
196    }
197}
198
199/// Allows converting to `Observable` computations.
200pub trait IntoObservable {
201
202    /// The target computation.
203    type Observable: Observable<Message = Self::Message>;
204
205    /// The type of messages about which the computation notifies.
206    type Message;
207
208    /// Convert to the `Observable` computation.
209    fn into_observable(self) -> Self::Observable;
210}
211
212impl<M: Observable> IntoObservable for M {
213
214    type Observable = M;
215
216    type Message = M::Message;
217
218    #[inline]
219    fn into_observable(self) -> Self::Observable {
220        self
221    }
222}
223
224/// It represents the boxed `Observable` computation.
225#[must_use = "computations are lazy and do nothing unless to be run"]
226pub struct ObservableBox<T> {
227    f: Box<dyn ObservableFnBoxClone<T>>
228}
229
230impl<T> ObservableBox<T> {
231
232    /// Create a new boxed computation.
233    #[doc(hidden)]
234    #[inline]
235    fn new<F>(f: F) -> Self
236        where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox> + Clone + 'static,
237              T: 'static
238    {
239        ObservableBox {
240            f: Box::new(f)
241        }
242    }
243
244    /// Call the boxed function.
245    #[doc(hidden)]
246    #[inline]
247    pub fn call_box(self, arg: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
248        let ObservableBox { f } = self;
249        f.call_box(arg)
250    }
251}
252
253impl<T> Clone for ObservableBox<T> {
254
255    #[inline]
256    fn clone(&self) -> Self {
257        ObservableBox {
258            f: self.f.call_clone()
259        }
260    }
261}
262
263impl<T> Observable for ObservableBox<T> where T: 'static {
264
265    type Message = T;
266
267    #[inline]
268    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
269        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
270    {
271        self.call_box((observer.into_boxed(),))
272    }
273
274    #[inline]
275    fn into_boxed(self) -> ObservableBox<Self::Message>
276        where Self: Sized + Clone + 'static
277    {
278        self
279    }
280}
281
282/// A trait to support the stable version of Rust, where there is no `FnBox`.
283trait ObservableFnBox<T> {
284
285    /// Call the corresponding function.
286    fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox>;
287}
288
289impl<T, F> ObservableFnBox<T> for F
290    where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox>,
291          T: 'static
292{
293    fn call_box(self: Box<Self>, args: (ObserverBox<T, ()>,)) -> EventBox<DisposableBox> {
294        let this: Self = *self;
295        this(args.0)
296    }
297}
298
299/// A trait to implement a cloneable `FnBox`.
300trait ObservableFnBoxClone<T>: ObservableFnBox<T> {
301
302    /// Clone the function.
303    fn call_clone(&self) -> Box<dyn ObservableFnBoxClone<T>>;
304}
305
306impl<T, F> ObservableFnBoxClone<T> for F
307    where F: FnOnce(ObserverBox<T, ()>) -> EventBox<DisposableBox> + Clone + 'static,
308          T: 'static
309{
310    fn call_clone(&self) -> Box<dyn ObservableFnBoxClone<T>> {
311        Box::new(self.clone())
312    }
313}
314
315/// The functor for the `Observable` computation.
316#[must_use = "computations are lazy and do nothing unless to be run"]
317#[derive(Clone)]
318pub struct Map<M, B, F> {
319
320    /// The current computation.
321    comp: M,
322
323    /// The transform.
324    f: F,
325
326    /// To keep the type parameter.
327    phantom: PhantomData<B>
328}
329
330impl<M, B, F> Observable for Map<M, B, F>
331    where M: Observable,
332          F: Fn(&M::Message) -> B + Clone + 'static,
333          M::Message: Clone + 'static,
334          B: 'static
335{
336    type Message = B;
337
338    #[inline]
339    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
340        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
341    {
342        let comp     = self.comp;
343        let f        = self.f;
344        let observer = cons_observer(move |m: &M::Message, p: &Point| {
345            let m2 = f(m);
346            observer.call_observer(&m2, p)
347        });
348
349        comp.subscribe(observer)
350    }
351}
352
353/// The transform for the `Observable` computation.
354#[must_use = "computations are lazy and do nothing unless to be run"]
355#[derive(Clone)]
356pub struct MapC<M, U, B, F> {
357
358    /// The current computation.
359    comp: M,
360
361    /// The transform.
362    f: F,
363
364    /// To keep the type parameter.
365    _phantom1: PhantomData<U>,
366
367    /// To keep the type parameter.
368    _phantom2: PhantomData<B>
369}
370
371impl<M, U, B, F> Observable for MapC<M, U, B, F>
372    where M: Observable,
373          F: Fn(&M::Message) -> U + Clone + 'static,
374          U: Event<Item = B>,
375          M::Message: Clone + 'static,
376          B: 'static
377{
378    type Message = B;
379
380    #[inline]
381    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
382        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
383    {
384        let comp     = self.comp;
385        let f        = self.f;
386        let observer = cons_observer(move |m: &M::Message, p: &Point| {
387            match f(m).call_event(p) {
388                Result::Err(e) => Result::Err(e),
389                Result::Ok(m2) => observer.call_observer(&m2, p)
390            }
391        });
392
393        comp.subscribe(observer)
394    }
395}
396
397/// The filter for the `Observable` computation.
398#[must_use = "computations are lazy and do nothing unless to be run"]
399#[derive(Clone)]
400pub struct Filter<M, F> {
401
402    /// The current computation.
403    comp: M,
404
405    /// The transform.
406    f: F
407}
408
409impl<M, F> Observable for Filter<M, F>
410    where M: Observable,
411          F: Fn(&M::Message) -> bool + Clone + 'static,
412          M::Message: Clone + 'static
413{
414    type Message = M::Message;
415
416    #[inline]
417    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
418        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
419    {
420        let comp     = self.comp;
421        let f        = self.f;
422        let observer = cons_observer(move |m: &M::Message, p: &Point| {
423            if f(m) {
424                observer.call_observer(m, p)
425            } else {
426                Result::Ok(())
427            }
428        });
429
430        comp.subscribe(observer)
431    }
432}
433
434/// The monadic filter for the `Observable` computation.
435#[must_use = "computations are lazy and do nothing unless to be run"]
436#[derive(Clone)]
437pub struct FilterC<M, U, F> {
438
439    /// The current computation.
440    comp: M,
441
442    /// The transform.
443    f: F,
444
445    /// To keep the type parameter.
446    _phantom: PhantomData<U>
447}
448
449impl<M, U, F> Observable for FilterC<M, U, F>
450    where M: Observable,
451          F: Fn(&M::Message) -> U + Clone + 'static,
452          U: Event<Item = bool>,
453          M::Message: Clone + 'static
454{
455    type Message = M::Message;
456
457    #[inline]
458    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
459        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
460    {
461        let comp     = self.comp;
462        let f        = self.f;
463        let observer = cons_observer(move |m: &M::Message, p: &Point| {
464            match f(m).call_event(p) {
465                Result::Err(e) => Result::Err(e),
466                Result::Ok(true) => observer.call_observer(m, p),
467                Result::Ok(false) => Result::Ok(())
468            }
469        });
470
471        comp.subscribe(observer)
472    }
473}
474
475/// Hold the receiving of messages within the `Observable` computation.
476#[must_use = "computations are lazy and do nothing unless to be run"]
477#[derive(Clone)]
478pub struct Hold<M> {
479
480    /// The current computation.
481    comp: M,
482
483    /// The delay interval.
484    dt: f64
485}
486
487impl<M> Observable for Hold<M>
488    where M: Observable,
489          M::Message: Clone + 'static
490{
491    type Message = M::Message;
492
493    #[inline]
494    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
495        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
496    {
497        let comp = self.comp;
498        let dt   = self.dt;
499        let r    = Grc::new(RefComp::new(false));
500        let observer = {
501            let r = r.clone();
502            let observer = Grc::new(observer);
503            cons_observer(move |m: &M::Message, p: &Point| {
504                let m = m.clone();
505                let r = r.clone();
506                let observer = observer.clone();
507                let comp = cons_event(move |p| {
508                    let x = r.read_at(p);
509                    if !x {
510                        observer.call_observer(&m, p)
511                    } else {
512                        Result::Ok(())
513                    }
514                });
515                enqueue_event(p.time + dt, comp.into_boxed())
516                    .call_event(p)
517            })
518        };
519        let h = comp.subscribe(observer);
520        let h = cons_disposable(move |p| {
521            r.write_at(true, p);
522            let h = h.call_event(p)?;
523            h.dispose(p)
524        }).into_boxed();
525        cons_event(move |_p| { Result::Ok(h) })
526            .into_boxed()
527    }
528}
529
530/// Hold the receiving of messages within the `Observable` computation.
531#[must_use = "computations are lazy and do nothing unless to be run"]
532#[derive(Clone)]
533pub struct HoldC<M, U, F> {
534
535    /// The current computation.
536    comp: M,
537
538    /// Get the delay interval within computation.
539    f: F,
540
541    /// The phantom parameter.
542    _phantom: PhantomData<U>
543}
544
545impl<M, U, F> Observable for HoldC<M, U, F>
546    where M: Observable,
547          M::Message: Clone + 'static,
548          F: Fn() -> U + Clone + 'static,
549          U: Event<Item = f64>
550{
551    type Message = M::Message;
552
553    #[inline]
554    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
555        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
556    {
557        let comp = self.comp;
558        let f    = self.f;
559        let r    = Grc::new(RefComp::new(false));
560        let observer = {
561            let r = r.clone();
562            let observer = Grc::new(observer);
563            cons_observer(move |m: &M::Message, p: &Point| {
564                let m = m.clone();
565                let r = r.clone();
566                let observer = observer.clone();
567                let comp = cons_event(move |p| {
568                    let x = r.read_at(p);
569                    if !x {
570                        observer.call_observer(&m, p)
571                    } else {
572                        Result::Ok(())
573                    }
574                });
575                let dt = f();
576                let dt = dt.call_event(p)?;
577                enqueue_event(p.time + dt, comp.into_boxed())
578                    .call_event(p)
579            })
580        };
581        let h = comp.subscribe(observer);
582        let h = cons_disposable(move |p| {
583            r.write_at(true, p);
584            let h = h.call_event(p)?;
585            h.dispose(p)
586        }).into_boxed();
587        cons_event(move |_p| { Result::Ok(h) })
588            .into_boxed()
589    }
590}
591
592/// The merge of two `Observable` computations.
593#[must_use = "computations are lazy and do nothing unless to be run"]
594#[derive(Clone)]
595pub struct Merge<M, U> {
596
597    /// The current computation.
598    comp: M,
599
600    /// Another computation.
601    other: U,
602}
603
604impl<M, U> Observable for Merge<M, U>
605    where M: Observable,
606          M::Message: Clone + 'static,
607          U: Observable<Message = M::Message>
608{
609    type Message = M::Message;
610
611    #[inline]
612    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
613        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
614    {
615        let observer  = Grc::new(observer);
616        let comp      = self.comp;
617        let other     = self.other;
618        let observer1 = {
619            let observer = observer.clone();
620            cons_observer(move |m: &M::Message, p: &Point| {
621                observer.call_observer(m, p)
622            })
623        };
624        let observer2 = {
625            cons_observer(move |m: &M::Message, p: &Point| {
626                observer.call_observer(m, p)
627            })
628        };
629        let disposable1 = comp.subscribe(observer1);
630        let disposable2 = other.subscribe(observer2);
631
632        disposable1.into_event().and_then(move |disposable1| {
633            disposable2.into_event().map(move |disposable2| {
634                let disposable1 = disposable1.into_disposable();
635                let disposable2 = disposable2.into_disposable();
636
637                disposable1.merge(disposable2).into_boxed()
638            })
639        }).into_boxed()
640    }
641}
642
643/// The empty `Observable` computation.
644#[must_use = "computations are lazy and do nothing unless to be run"]
645#[derive(Clone)]
646pub struct Empty<T> {
647
648    /// The transform.
649    phantom: PhantomData<T>
650}
651
652impl<T> Observable for Empty<T> {
653
654    type Message = T;
655
656    #[inline]
657    fn subscribe<O>(self, _observer: O) -> EventBox<DisposableBox>
658        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
659    {
660        let disposable = empty_disposable().into_boxed();
661        return_event(disposable).into_boxed()
662    }
663}
664
665/// Allows delaying the `Observable` computation by the specified function.
666#[must_use = "computations are lazy and do nothing unless to be run"]
667#[derive(Clone)]
668pub struct Delay<F, M> {
669
670    /// Return the computation.
671    f: F,
672
673    /// To keep the type parameter.
674    _phantom: PhantomData<M>
675}
676
677impl<F, M> Observable for Delay<F, M>
678    where F: FnOnce() -> M + 'static,
679          M: Observable + 'static
680{
681    type Message = M::Message;
682
683    #[inline]
684    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
685        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
686    {
687        let Delay { f, _phantom } = self;
688        f().subscribe(observer)
689    }
690}
691
692/// A source of signal in the integration time points.
693#[must_use = "computations are lazy and do nothing unless to be run"]
694#[derive(Clone)]
695pub struct NewSourceInIntegTimes {}
696
697impl Event for NewSourceInIntegTimes {
698
699    type Item = Grc<ObservableSource<f64>>;
700
701    #[doc(hidden)]
702    #[inline]
703    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
704        let source  = Grc::new(source::ObservableSource::new());
705        let source2 = source.clone();
706        enqueue_events_with_integ_times(move || {
707            let source = source.clone();
708            cons_event(move |p| {
709                source.trigger_at(&p.time, p)
710            }).into_boxed()
711        }).call_event(p)?;
712        Result::Ok(source2)
713    }
714}
715
716/// A source of signal in the start time point.
717#[must_use = "computations are lazy and do nothing unless to be run"]
718#[derive(Clone)]
719pub struct NewSourceInStartTime {}
720
721impl Event for NewSourceInStartTime {
722
723    type Item = Grc<ObservableSource<f64>>;
724
725    #[doc(hidden)]
726    #[inline]
727    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
728        let source  = Grc::new(source::ObservableSource::new());
729        let source2 = source.clone();
730        enqueue_event(p.run.specs.start_time, {
731            cons_event(move |p| {
732                source.trigger_at(&p.time, p)
733            }).into_boxed()
734        }).call_event(p)?;
735        Result::Ok(source2)
736    }
737}
738
739/// A source of signal in the stop time point.
740#[must_use = "computations are lazy and do nothing unless to be run"]
741#[derive(Clone)]
742pub struct NewSourceInStopTime {}
743
744impl Event for NewSourceInStopTime {
745
746    type Item = Grc<ObservableSource<f64>>;
747
748    #[doc(hidden)]
749    #[inline]
750    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
751        let source  = Grc::new(source::ObservableSource::new());
752        let source2 = source.clone();
753        enqueue_event(p.run.specs.stop_time, {
754            cons_event(move |p| {
755                source.trigger_at(&p.time, p)
756            }).into_boxed()
757        }).call_event(p)?;
758        Result::Ok(source2)
759    }
760}
761
762/// Trigger a signal in the integration time points.
763#[must_use = "computations are lazy and do nothing unless to be run"]
764#[derive(Clone)]
765pub struct NewInIntegTimes {}
766
767impl Event for NewInIntegTimes {
768
769    type Item = ObservableBox<f64>;
770
771    #[doc(hidden)]
772    #[inline]
773    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
774        let comp   = NewSourceInIntegTimes {};
775        let source = comp.call_event(p)?;
776        Result::Ok(source.publish().into_boxed())
777    }
778}
779
780/// Trigger a signal in the start time point.
781#[must_use = "computations are lazy and do nothing unless to be run"]
782#[derive(Clone)]
783pub struct NewInStartTime {}
784
785impl Event for NewInStartTime {
786
787    type Item = ObservableBox<f64>;
788
789    #[doc(hidden)]
790    #[inline]
791    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
792        let comp   = NewSourceInStartTime {};
793        let source = comp.call_event(p)?;
794        Result::Ok(source.publish().into_boxed())
795    }
796}
797
798/// Trigger a signal in the stop time point.
799#[must_use = "computations are lazy and do nothing unless to be run"]
800#[derive(Clone)]
801pub struct NewInStopTime {}
802
803impl Event for NewInStopTime {
804
805    type Item = ObservableBox<f64>;
806
807    #[doc(hidden)]
808    #[inline]
809    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
810        let comp   = NewSourceInStopTime {};
811        let source = comp.call_event(p)?;
812        Result::Ok(source.publish().into_boxed())
813    }
814}
815
816/// Trace the computation.
817#[must_use = "computations are lazy and do nothing unless to be run"]
818#[derive(Clone)]
819pub struct Trace<M> {
820
821    /// The computation.
822    comp: M,
823
824    /// The message to print.
825    msg: String
826}
827
828impl<M> Observable for Trace<M>
829    where M: Observable
830{
831    type Message = M::Message;
832
833    #[doc(hidden)]
834    #[inline]
835    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
836        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
837    {
838        let Trace { comp, msg } = self;
839        comp.subscribe(trace_observer(msg, observer))
840    }
841}