dvcompute_dist/simulation/observable/
observer.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::marker::PhantomData;
8
9#[cfg(feature="dist_mode")]
10use std::sync::Arc;
11
12#[cfg(feature="dist_mode")]
13use std::slice;
14
15#[cfg(feature="dist_mode")]
16use std::mem;
17
18#[cfg(feature="dist_mode")]
19use std::ptr;
20
21#[cfg(feature="dist_mode")]
22use libc::*;
23
24use crate::simulation;
25use crate::simulation::point::Point;
26
27#[cfg(feature="dist_mode")]
28use crate::simulation::error::*;
29
30#[cfg(feature="dist_mode")]
31use crate::simulation::utils::byte_vec::*;
32
33use dvcompute_utils::grc::Grc;
34
35/// Return a new `Observer` computation by the specified pure value.
36#[inline]
37pub fn return_observer<M, T>(val: T) -> Return<M, T>
38    where T: Clone
39{
40    Return { val: val, _phantom: PhantomData }
41}
42
43/// Delay the `Observer` computation.
44#[inline]
45pub fn delay_observer<F, O>(f: F) -> Delay<F, O>
46    where F: Fn() -> O,
47          O: Observer
48{
49    Delay { f: f, _phantom: PhantomData }
50}
51
52/// Construct a new `Observer` computation by the specified function.
53#[inline]
54pub fn cons_observer<F, M, T>(f: F) -> Cons<F, M, T>
55    where F: Fn(&M, &Point) -> simulation::Result<T>
56{
57     Cons { f: f, _phantom1: PhantomData, _phantom2: PhantomData }
58}
59
60/// Return a new `Observer` computation that returns the outer message which it should react to.
61#[inline]
62pub fn message_observer<M>() -> Message<M>
63    where M: Clone
64{
65    Message { _phantom: PhantomData }
66}
67
68/// Trace the computation.
69#[inline]
70pub fn trace_observer<O>(msg: String, comp: O) -> Trace<O>
71    where O: Observer
72{
73    Trace { comp: comp, msg: msg}
74}
75
76/// The computation that receives notifications.
77pub trait Observer {
78
79    /// The outer message that the observer reacts to.
80    type Message;
81
82    /// The type of the item that the computation returns.
83    type Item;
84
85    /// Call the computation in the current modeling time to react to the specified message.
86    #[doc(hidden)]
87    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item>;
88
89    /// Bind the current computation with its continuation within the resulting computation.
90    #[inline]
91    fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
92        where Self: Sized,
93              U: Observer<Message = Self::Message>,
94              F: Fn(Self::Item) -> U,
95    {
96        AndThen { comp: self, f: f, _phantom: PhantomData }
97    }
98
99    /// Map the current computation using the specified transform.
100    #[inline]
101    fn map<B, F>(self, f: F) -> Map<Self, B, F>
102        where Self: Sized,
103              F: Fn(Self::Item) -> B,
104    {
105        Map { comp: self, f: f, _phantom: PhantomData }
106    }
107
108    /// Zip the current computation with another one within the resulting computation.
109    #[inline]
110    fn zip<U>(self, other: U) -> Zip<Self, U>
111        where Self: Sized,
112              U: Observer<Message = Self::Message>
113    {
114        Zip { comp: self, other: other }
115    }
116
117    /// The function application.
118    #[inline]
119    fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
120        where Self: Sized,
121              Self::Item: Fn(U::Item) -> B,
122              U: Observer<Message = Self::Message>
123    {
124        Ap { comp: self, other: other, _phantom: PhantomData }
125    }
126
127    /// Convert into a boxed value.
128    #[inline]
129    fn into_boxed(self) -> ObserverBox<Self::Message, Self::Item>
130        where Self: Sized + Clone + 'static
131    {
132        ObserverBox::new(move |m: &Self::Message, p: &Point| {
133            self.call_observer(m, p)
134        })
135    }
136}
137
138/// Allows converting to `Observer` computations.
139pub trait IntoObserver {
140
141    /// The target computation.
142    type Observer: Observer<Message = Self::Message, Item = Self::Item>;
143
144    /// The type of messages that are processed by the observer.
145    type Message;
146
147    /// The type of items that the observer returns.
148    type Item;
149
150    /// Convert to the `Observer` computation.
151    fn into_observer(self) -> Self::Observer;
152}
153
154impl<M: Observer> IntoObserver for M {
155
156    type Observer = M;
157
158    type Message = M::Message;
159
160    type Item = M::Item;
161
162    #[inline]
163    fn into_observer(self) -> Self::Observer {
164        self
165    }
166}
167
168/// It represents the boxed `Observer` computation.
169#[must_use = "computations are lazy and do nothing unless to be run"]
170pub struct ObserverBox<M, T> where M: 'static, T: 'static {
171    f: Grc<Box<dyn Fn(&M, &Point) -> simulation::Result<T>>>
172}
173
174impl<M, T> ObserverBox<M, T> {
175
176    /// Create a new boxed computation.
177    #[doc(hidden)]
178    #[inline]
179    fn new<F>(f: F) -> Self
180        where F: Fn(&M, &Point) -> simulation::Result<T> + Clone + 'static
181    {
182        ObserverBox {
183            f: Grc::new(Box::new(f))
184        }
185    }
186
187    /// Call the boxed function.
188    #[doc(hidden)]
189    #[inline]
190    pub fn call(&self, m: &M, p: &Point) -> simulation::Result<T> {
191        let ObserverBox { f } = self;
192        f(m, p)
193    }
194}
195
196impl<M, T> Clone for ObserverBox<M, T> {
197
198    #[inline]
199    fn clone(&self) -> Self {
200        let f = &self.f;
201        ObserverBox {
202            f: f.clone()
203        }
204    }
205}
206
207impl<M, T> Observer for ObserverBox<M, T> {
208
209    type Message = M;
210    type Item = T;
211
212    #[doc(hidden)]
213    #[inline]
214    fn call_observer(&self, m: &M, p: &Point) -> simulation::Result<T> {
215        self.call(m, p)
216    }
217
218    #[inline]
219    fn into_boxed(self) -> ObserverBox<Self::Message, Self::Item>
220        where Self: Sized + Clone + 'static
221    {
222        self
223    }
224}
225
226/// It represents a raw trait object.
227#[cfg(feature="dist_mode")]
228#[repr(C)]
229#[derive(Copy, Clone)]
230struct ObserverTraitObject {
231
232    field1: *mut c_void,
233    field2: *mut c_void
234}
235
236/// A C-friendly representaton of the `Observer` computation.
237#[cfg(feature="dist_mode")]
238#[repr(C)]
239pub struct ObserverRepr {
240
241    /// Delete the object.
242    delete: unsafe extern "C" fn(obj: *mut ObserverTraitObject),
243
244    /// Clone the computation.
245    clone: unsafe extern "C" fn(obj: *const ObserverTraitObject) -> ObserverRepr,
246
247    /// The callback.
248    callback: unsafe extern "C" fn(obj: *const ObserverTraitObject, message: *const u8, count: usize, p: *const Point) -> *mut ErrorRepr,
249
250    /// The trait object.
251    trait_object: ObserverTraitObject
252}
253
254#[cfg(feature="dist_mode")]
255impl Drop for ObserverRepr {
256
257    fn drop(&mut self) {
258        unsafe {
259            (self.delete)(&mut self.trait_object);
260        }
261    }
262}
263
264#[cfg(feature="dist_mode")]
265impl Clone for ObserverRepr {
266
267    fn clone(&self) -> Self {
268        unsafe {
269            (self.clone)(&self.trait_object)
270        }
271    }
272}
273
274#[cfg(feature="dist_mode")]
275impl ObserverRepr {
276
277    /// Convert to a C-friendly representation.
278    #[inline]
279    pub fn into_repr(comp: ObserverBox<&[u8], ()>) -> ObserverRepr {
280        unsafe {
281            ObserverRepr {
282                delete: delete_observer_repr,
283                clone: clone_observer_repr,
284                callback: call_observer_repr,
285                trait_object: ObserverTraitObject {
286                    field1: mem::transmute(comp),
287                    field2: ptr::null_mut()
288                }
289            }
290        }
291    }
292
293    /// Call the representation.
294    #[inline]
295    fn call_repr(&self, m: &[u8], p: &Point) -> *mut ErrorRepr {
296        unsafe {
297            (self.callback)(&self.trait_object, m.as_ptr(), m.len(), p)
298        }
299    }
300}
301
302/// Call the `ObserverBox` representation.
303#[cfg(feature="dist_mode")]
304unsafe extern "C" fn call_observer_repr(comp: *const ObserverTraitObject, m: *const u8, count: usize, p: *const Point) -> *mut ErrorRepr {
305    let comp: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
306    let m = slice::from_raw_parts(m, count);
307    match comp.call_observer(&m, &*p) {
308        Result::Ok(()) => {
309            mem::forget(comp);
310            ptr::null_mut()
311        },
312        Result::Err(e) => {
313            mem::forget(comp);
314            let e = ErrorRepr::new(e);
315            Box::into_raw(Box::new(e))
316        }
317    }
318}
319
320/// Clone the `ObserverBox` representation.
321#[cfg(feature="dist_mode")]
322unsafe extern "C" fn clone_observer_repr(comp: *const ObserverTraitObject) -> ObserverRepr {
323    let comp: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
324    let x = ObserverRepr {
325        delete: delete_observer_repr,
326        clone: clone_observer_repr,
327        callback: call_observer_repr,
328        trait_object: ObserverTraitObject {
329            field1: mem::transmute(comp.clone()),
330            field2: ptr::null_mut()
331        }
332    };
333    mem::forget(comp);
334    x
335}
336
337/// Delete the `ObserverBox` representation.
338#[cfg(feature="dist_mode")]
339unsafe extern "C" fn delete_observer_repr(comp: *mut ObserverTraitObject) {
340    let _: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
341}
342
343#[cfg(feature="dist_mode")]
344impl Observer for ObserverRepr {
345
346    type Message = Arc<ByteVecRepr>;
347
348    type Item = ();
349
350    #[doc(hidden)]
351    #[inline]
352    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
353        unsafe {
354            let m = m.slice();
355            let e = self.call_repr(m, p);
356            if e == ptr::null_mut() {
357                Result::Ok(())
358            } else {
359                let e = ffi_error_repr_into_error(e);
360                Result::Err(e)
361            }
362        }
363    }
364}
365
366/// Allows creating the `Observer` computation from a pure value.
367#[must_use = "computations are lazy and do nothing unless to be run"]
368#[derive(Clone)]
369pub struct Return<M, T> {
370
371    /// Return a pure value, which is then transformed to the computation.
372    val: T,
373
374    /// To keep the type parameter.
375    _phantom: PhantomData<M>
376}
377
378impl<M, T> Observer for Return<M, T>
379    where T: Clone
380{
381    type Message = M;
382    type Item = T;
383
384    #[doc(hidden)]
385    #[inline]
386    fn call_observer(&self, _: &M, _: &Point) -> simulation::Result<T> {
387        Result::Ok(self.val.clone())
388    }
389}
390
391/// Allows delaying the `Observer` computation by the specified function.
392#[must_use = "computations are lazy and do nothing unless to be run"]
393#[derive(Clone)]
394pub struct Delay<F, O> {
395
396    /// Return the computation.
397    f: F,
398
399    /// To keep the type parameter.
400    _phantom: PhantomData<O>
401}
402
403impl<F, O> Observer for Delay<F, O>
404    where F: Fn() -> O,
405          O: Observer
406{
407    type Message = O::Message;
408    type Item = O::Item;
409
410    #[doc(hidden)]
411    #[inline]
412    fn call_observer(&self, m: &O::Message, p: &Point) -> simulation::Result<O::Item> {
413        let Delay { f, _phantom } = self;
414        f().call_observer(m, p)
415    }
416}
417
418/// Allows constructing the `Observer` computation by the specified function.
419#[must_use = "computations are lazy and do nothing unless to be run"]
420#[derive(Clone)]
421pub struct Cons<F, M, T> {
422
423    /// The function of time point.
424    f: F,
425
426    /// To keep the type parameter.
427    _phantom1: PhantomData<M>,
428
429    /// To keep the type parameter.
430    _phantom2: PhantomData<T>
431}
432
433impl<F, M, T> Observer for Cons<F, M, T>
434    where F: Fn(&M, &Point) -> simulation::Result<T>
435{
436    type Message = M;
437    type Item = T;
438
439    #[doc(hidden)]
440    #[inline]
441    fn call_observer(&self, m: &M, p: &Point) -> simulation::Result<T> {
442        let Cons { f, _phantom1, _phantom2 } = self;
443        f(m, p)
444    }
445}
446
447/// Allows creating the `Observer` computation that returns the outer message.
448#[must_use = "computations are lazy and do nothing unless to be run"]
449#[derive(Clone)]
450pub struct Message<M> {
451
452    /// To keep the type parameter.
453    _phantom: PhantomData<M>
454}
455
456impl<M> Observer for Message<M>
457    where M: Clone
458{
459    type Message = M;
460    type Item = M;
461
462    #[doc(hidden)]
463    #[inline]
464    fn call_observer(&self, m: &Self::Message, _: &Point) -> simulation::Result<Self::Item> {
465        Result::Ok(m.clone())
466    }
467}
468
469/// The monadic bind for the `Observer` computation.
470#[must_use = "computations are lazy and do nothing unless to be run"]
471#[derive(Clone)]
472pub struct AndThen<O, U, F> {
473
474    /// The current computation.
475    comp: O,
476
477    /// The continuation of the current computation.
478    f: F,
479
480    /// To keep the type parameter.
481    _phantom: PhantomData<U>
482}
483
484impl<O, U, F> Observer for AndThen<O, U, F>
485    where O: Observer,
486          U: Observer<Message = O::Message>,
487          F: Fn(O::Item) -> U,
488{
489    type Message = U::Message;
490    type Item = U::Item;
491
492    #[doc(hidden)]
493    #[inline]
494    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
495        let AndThen { comp, f, _phantom } = self;
496        match comp.call_observer(m, p) {
497            Result::Ok(a) => {
498                let u = f(a);
499                u.call_observer(m, p)
500            },
501            Result::Err(e) => {
502                Result::Err(e)
503            }
504        }
505    }
506}
507
508/// The functor for the `Observer` computation.
509#[must_use = "computations are lazy and do nothing unless to be run"]
510#[derive(Clone)]
511pub struct Map<O, B, F> {
512
513    /// The current computation.
514    comp: O,
515
516    /// The transform.
517    f: F,
518
519    /// To keep the type parameter.
520    _phantom: PhantomData<B>
521}
522
523impl<O, B, F> Observer for Map<O, B, F>
524    where O: Observer,
525          F: Fn(O::Item) -> B,
526{
527    type Message = O::Message;
528    type Item = B;
529
530    #[doc(hidden)]
531    #[inline]
532    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
533        let Map { comp, f, _phantom } = self;
534        match comp.call_observer(m, p) {
535            Result::Ok(a) => Result::Ok(f(a)),
536            Result::Err(e) => Result::Err(e)
537        }
538    }
539}
540
541/// The zip of two `Observer` computations.
542#[must_use = "computations are lazy and do nothing unless to be run"]
543#[derive(Clone)]
544pub struct Zip<O, U> {
545
546    /// The current computation.
547    comp: O,
548
549    /// Another computation.
550    other: U,
551}
552
553impl<O, U> Observer for Zip<O, U>
554    where O: Observer,
555          U: Observer<Message = O::Message>
556{
557    type Message = O::Message;
558    type Item = (O::Item, U::Item);
559
560    #[doc(hidden)]
561    #[inline]
562    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
563        let Zip { comp, other } = self;
564        match comp.call_observer(m, p) {
565            Result::Ok(a) => {
566                match other.call_observer(m, p) {
567                    Result::Ok(b) => Result::Ok((a, b)),
568                    Result::Err(e) => Result::Err(e)
569                }
570            },
571            Result::Err(e) => Result::Err(e)
572        }
573    }
574}
575
576/// The function application for the `Observer` computation.
577#[must_use = "computations are lazy and do nothing unless to be run"]
578#[derive(Clone)]
579pub struct Ap<O, U, B> {
580
581    /// The current computation.
582    comp: O,
583
584    /// The continuation of the current computation.
585    other: U,
586
587    /// To keep the type parameter.
588    _phantom: PhantomData<B>
589}
590
591impl<O, U, B> Observer for Ap<O, U, B>
592    where O: Observer,
593          U: Observer<Message = O::Message>,
594          O::Item: Fn(U::Item) -> B,
595{
596    type Message = O::Message;
597    type Item = B;
598
599    #[doc(hidden)]
600    #[inline]
601    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
602        let Ap { comp, other, _phantom } = self;
603        match comp.call_observer(m, p) {
604            Result::Ok(f) => {
605                match other.call_observer(m, p) {
606                    Result::Ok(a) => Result::Ok(f(a)),
607                    Result::Err(e) => Result::Err(e)
608                }
609            },
610            Result::Err(e) => Result::Err(e)
611        }
612    }
613}
614
615/// Trace the computation.
616#[must_use = "computations are lazy and do nothing unless to be run"]
617#[derive(Clone)]
618pub struct Trace<O> {
619
620    /// The computation.
621    comp: O,
622
623    /// The message to print.
624    msg: String
625}
626
627impl<O> Observer for Trace<O>
628    where O: Observer
629{
630    type Message = O::Message;
631    type Item = O::Item;
632
633    #[doc(hidden)]
634    #[inline]
635    fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
636        let Trace { comp, msg } = self;
637        p.trace(&msg);
638        comp.call_observer(m, p)
639    }
640}