dvcompute_dist/simulation/observable/
source.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::ops::Deref;
8
9use crate::simulation;
10use crate::simulation::point::Point;
11use crate::simulation::observable::*;
12use crate::simulation::observable::disposable::*;
13use crate::simulation::observable::observer::*;
14use crate::simulation::event::Event;
15use crate::simulation::event::EventBox;
16use crate::simulation::event::cons_event;
17use crate::simulation::ref_comp::RefComp;
18
19use dvcompute_utils::grc::Grc;
20use dvcompute_utils::grc::Weak;
21
22/// The source of `Observable` computation.
23#[derive(Clone)]
24pub struct ObservableSource<M> where M: 'static {
25
26    /// The list of observers.
27    observers: Grc<RefComp<ObserverList<M>>>
28}
29
30impl<M: 'static> ObservableSource<M> {
31
32    /// Create a new source of observable messages.
33    pub fn new() -> ObservableSource<M> {
34        ObservableSource {
35            observers: Grc::new(RefComp::new(ObserverList::Nil))
36        }
37    }
38
39    /// Trigger the message at the specified time point.
40    pub fn trigger_at(&self, message: &M, p: &Point) -> simulation::Result<()> {
41        let observers = self.observers.read_at(p);
42        trigger_observers(&observers, message, p)
43    }
44
45    /// Trigger the message within the `Event` computation.
46    pub fn trigger(&self, message: M) -> Trigger<M> {
47        Trigger { observers: self.observers.clone(), message: message }
48    }
49
50    /// Publish the observable.
51    pub fn publish(&self) -> Publish<M> where M: 'static {
52        Publish { observers: Grc::downgrade(&self.observers) }
53    }
54}
55
56/// The trigger computation by cloning the reference.
57#[must_use = "computations are lazy and do nothing unless to be run"]
58#[derive(Clone)]
59pub struct Trigger<M> where M: 'static {
60
61    /// The source of observable messages.
62    observers: Grc<RefComp<ObserverList<M>>>,
63
64    /// The message.
65    message: M
66}
67
68impl<M: 'static> Event for Trigger<M> {
69
70    type Item = ();
71
72    #[doc(hidden)]
73    #[inline]
74    fn call_event(self, p: &Point) -> simulation::Result<()> {
75        let Trigger { observers, message } = self;
76        let observers = observers.read_at(p);
77        trigger_observers(&observers, &message, p)
78    }
79}
80
81/// The publishing computation.
82#[must_use = "computations are lazy and do nothing unless to be run"]
83#[derive(Clone)]
84pub struct Publish<M> where M: 'static {
85
86    /// The source of observable messages.
87    observers: Weak<RefComp<ObserverList<M>>>
88}
89
90impl<M> Observable for Publish<M> where M: 'static {
91
92    type Message = M;
93
94    #[inline]
95    fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
96        where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
97    {
98        let Publish { observers } = self;
99        cons_event(move |p: &Point| {
100            match observers.upgrade() {
101                None => {
102                    Result::Ok(empty_disposable().into_boxed())
103                },
104                Some(observers) => {
105                    let observer   = Grc::new(observer.into_boxed());
106                    let disposable = SourceDisposable {
107                        observers: Grc::downgrade(&observers),
108                        observer: Grc::downgrade(&observer)
109                    };
110                    let disposable = disposable.into_boxed();
111                    let observers0 = observers.read_at(p);
112                    let observers2 = add_observer(observer, Grc::new(observers0));
113                    observers.write_at(observers2, p);
114
115                    Result::Ok(disposable)
116                }
117            }
118        }).into_boxed()
119    }
120}
121
122/// The disposable object.
123struct SourceDisposable<M> where M: 'static {
124
125    /// The source of observable messages.
126    observers: Weak<RefComp<ObserverList<M>>>,
127
128    /// The observer that should be disposed.
129    observer: Weak<ObserverBox<M, ()>>
130}
131
132impl<M> Disposable for SourceDisposable<M>
133    where M: 'static
134{
135    #[doc(hidden)]
136    #[inline]
137    fn dispose(self, p: &Point) -> simulation::Result<()> {
138        let SourceDisposable { observers, observer } = self;
139        match observers.upgrade() {
140            None => (),
141            Some(observers) => {
142                match observer.upgrade() {
143                    None => (),
144                    Some(observer) => {
145                        let observers0 = observers.read_at(p);
146                        let observers2 = delete_observer(observer, &observers0);
147                        observers.write_at(observers2, p);
148                    }
149                }
150            }
151        }
152        Result::Ok(())
153    }
154}
155
156impl<M> Clone for SourceDisposable<M> {
157
158    fn clone(&self) -> Self {
159        SourceDisposable {
160            observers: self.observers.clone(),
161            observer: self.observer.clone()
162        }
163    }
164}
165
166/// The list of observers.
167enum ObserverList<M> where M: 'static {
168
169    /// The cons cell.
170    Cons(Grc<ObserverBox<M, ()>>, Grc<ObserverList<M>>),
171
172    /// An empty list.
173    Nil
174}
175
176impl<M> Clone for ObserverList<M> {
177
178    fn clone(&self) -> Self {
179        match self {
180            &ObserverList::Cons(ref observer, ref tail) => {
181                ObserverList::Cons(observer.clone(), tail.clone())
182            },
183            &ObserverList::Nil => ObserverList::Nil
184        }
185    }
186}
187
188/// Trigger the observers passing in the specified message to them.
189fn trigger_observers<M>(observers: &ObserverList<M>, m: &M, p: &Point) -> simulation::Result<()> {
190    let mut observers = observers;
191    loop {
192        match observers {
193            &ObserverList::Nil => return Result::Ok(()),
194            &ObserverList::Cons(ref observer, ref tail) => {
195                match observer.call(m, p) {
196                    Result::Ok(()) => (),
197                    e@Result::Err(_) => return e
198                }
199                observers = tail.deref();
200            }
201        }
202    }
203}
204
205/// Add a new observer to the list.
206#[inline]
207fn add_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: Grc<ObserverList<M>>) -> ObserverList<M> {
208    ObserverList::Cons(observer, observers)
209}
210
211/// Remove the specified observer from the list.
212fn delete_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: &ObserverList<M>) -> ObserverList<M> {
213    let init          = observers;
214    let mut first     = ObserverList::Nil;
215    let mut observers = observers;
216    loop {
217        match observers {
218            &ObserverList::Nil => {
219                return init.clone();
220            },
221            &ObserverList::Cons(ref head, ref tail) if Grc::ptr_eq(&observer, head) => {
222                return append_rev_observers(&first, tail.clone());
223            },
224            &ObserverList::Cons(ref head, ref tail) => {
225                first     = ObserverList::Cons(head.clone(), Grc::new(first));
226                observers = tail.deref();
227            }
228        }
229    }
230}
231
232/// Append the reversed observers to the list tail.
233fn append_rev_observers<M>(observers: &ObserverList<M>, tail: Grc<ObserverList<M>>) -> ObserverList<M> {
234    let mut result    = tail;
235    let mut observers = observers;
236    loop {
237        match observers {
238            &ObserverList::Nil => {
239                return (*result).clone(); // it is almost impossible
240            },
241            &ObserverList::Cons(ref observer, ref tail) => {
242                observers = tail.deref();
243                let list  = ObserverList::Cons(observer.clone(), result);
244                match observers {
245                    &ObserverList::Nil => {
246                        return list;
247                    },
248                    &ObserverList::Cons(_, _) => {
249                        result = Grc::new(list);
250                    }
251                }
252            }
253        }
254    }
255}