dvcompute_dist/simulation/observable/
source.rs1use std::ops::Deref;
8
9use crate::simulation;
10use crate::simulation::point::Point;
11use crate::simulation::observable::*;
12use crate::simulation::observable::disposable::*;
13use crate::simulation::observable::observer::*;
14use crate::simulation::event::Event;
15use crate::simulation::event::EventBox;
16use crate::simulation::event::cons_event;
17use crate::simulation::ref_comp::RefComp;
18
19use dvcompute_utils::grc::Grc;
20use dvcompute_utils::grc::Weak;
21
22#[derive(Clone)]
24pub struct ObservableSource<M> where M: 'static {
25
26 observers: Grc<RefComp<ObserverList<M>>>
28}
29
30impl<M: 'static> ObservableSource<M> {
31
32 pub fn new() -> ObservableSource<M> {
34 ObservableSource {
35 observers: Grc::new(RefComp::new(ObserverList::Nil))
36 }
37 }
38
39 pub fn trigger_at(&self, message: &M, p: &Point) -> simulation::Result<()> {
41 let observers = self.observers.read_at(p);
42 trigger_observers(&observers, message, p)
43 }
44
45 pub fn trigger(&self, message: M) -> Trigger<M> {
47 Trigger { observers: self.observers.clone(), message: message }
48 }
49
50 pub fn publish(&self) -> Publish<M> where M: 'static {
52 Publish { observers: Grc::downgrade(&self.observers) }
53 }
54}
55
56#[must_use = "computations are lazy and do nothing unless to be run"]
58#[derive(Clone)]
59pub struct Trigger<M> where M: 'static {
60
61 observers: Grc<RefComp<ObserverList<M>>>,
63
64 message: M
66}
67
68impl<M: 'static> Event for Trigger<M> {
69
70 type Item = ();
71
72 #[doc(hidden)]
73 #[inline]
74 fn call_event(self, p: &Point) -> simulation::Result<()> {
75 let Trigger { observers, message } = self;
76 let observers = observers.read_at(p);
77 trigger_observers(&observers, &message, p)
78 }
79}
80
81#[must_use = "computations are lazy and do nothing unless to be run"]
83#[derive(Clone)]
84pub struct Publish<M> where M: 'static {
85
86 observers: Weak<RefComp<ObserverList<M>>>
88}
89
90impl<M> Observable for Publish<M> where M: 'static {
91
92 type Message = M;
93
94 #[inline]
95 fn subscribe<O>(self, observer: O) -> EventBox<DisposableBox>
96 where O: Observer<Message = Self::Message, Item = ()> + Clone + 'static
97 {
98 let Publish { observers } = self;
99 cons_event(move |p: &Point| {
100 match observers.upgrade() {
101 None => {
102 Result::Ok(empty_disposable().into_boxed())
103 },
104 Some(observers) => {
105 let observer = Grc::new(observer.into_boxed());
106 let disposable = SourceDisposable {
107 observers: Grc::downgrade(&observers),
108 observer: Grc::downgrade(&observer)
109 };
110 let disposable = disposable.into_boxed();
111 let observers0 = observers.read_at(p);
112 let observers2 = add_observer(observer, Grc::new(observers0));
113 observers.write_at(observers2, p);
114
115 Result::Ok(disposable)
116 }
117 }
118 }).into_boxed()
119 }
120}
121
122struct SourceDisposable<M> where M: 'static {
124
125 observers: Weak<RefComp<ObserverList<M>>>,
127
128 observer: Weak<ObserverBox<M, ()>>
130}
131
132impl<M> Disposable for SourceDisposable<M>
133 where M: 'static
134{
135 #[doc(hidden)]
136 #[inline]
137 fn dispose(self, p: &Point) -> simulation::Result<()> {
138 let SourceDisposable { observers, observer } = self;
139 match observers.upgrade() {
140 None => (),
141 Some(observers) => {
142 match observer.upgrade() {
143 None => (),
144 Some(observer) => {
145 let observers0 = observers.read_at(p);
146 let observers2 = delete_observer(observer, &observers0);
147 observers.write_at(observers2, p);
148 }
149 }
150 }
151 }
152 Result::Ok(())
153 }
154}
155
156impl<M> Clone for SourceDisposable<M> {
157
158 fn clone(&self) -> Self {
159 SourceDisposable {
160 observers: self.observers.clone(),
161 observer: self.observer.clone()
162 }
163 }
164}
165
166enum ObserverList<M> where M: 'static {
168
169 Cons(Grc<ObserverBox<M, ()>>, Grc<ObserverList<M>>),
171
172 Nil
174}
175
176impl<M> Clone for ObserverList<M> {
177
178 fn clone(&self) -> Self {
179 match self {
180 &ObserverList::Cons(ref observer, ref tail) => {
181 ObserverList::Cons(observer.clone(), tail.clone())
182 },
183 &ObserverList::Nil => ObserverList::Nil
184 }
185 }
186}
187
188fn trigger_observers<M>(observers: &ObserverList<M>, m: &M, p: &Point) -> simulation::Result<()> {
190 let mut observers = observers;
191 loop {
192 match observers {
193 &ObserverList::Nil => return Result::Ok(()),
194 &ObserverList::Cons(ref observer, ref tail) => {
195 match observer.call(m, p) {
196 Result::Ok(()) => (),
197 e@Result::Err(_) => return e
198 }
199 observers = tail.deref();
200 }
201 }
202 }
203}
204
205#[inline]
207fn add_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: Grc<ObserverList<M>>) -> ObserverList<M> {
208 ObserverList::Cons(observer, observers)
209}
210
211fn delete_observer<M>(observer: Grc<ObserverBox<M, ()>>, observers: &ObserverList<M>) -> ObserverList<M> {
213 let init = observers;
214 let mut first = ObserverList::Nil;
215 let mut observers = observers;
216 loop {
217 match observers {
218 &ObserverList::Nil => {
219 return init.clone();
220 },
221 &ObserverList::Cons(ref head, ref tail) if Grc::ptr_eq(&observer, head) => {
222 return append_rev_observers(&first, tail.clone());
223 },
224 &ObserverList::Cons(ref head, ref tail) => {
225 first = ObserverList::Cons(head.clone(), Grc::new(first));
226 observers = tail.deref();
227 }
228 }
229 }
230}
231
232fn append_rev_observers<M>(observers: &ObserverList<M>, tail: Grc<ObserverList<M>>) -> ObserverList<M> {
234 let mut result = tail;
235 let mut observers = observers;
236 loop {
237 match observers {
238 &ObserverList::Nil => {
239 return (*result).clone(); },
241 &ObserverList::Cons(ref observer, ref tail) => {
242 observers = tail.deref();
243 let list = ObserverList::Cons(observer.clone(), result);
244 match observers {
245 &ObserverList::Nil => {
246 return list;
247 },
248 &ObserverList::Cons(_, _) => {
249 result = Grc::new(list);
250 }
251 }
252 }
253 }
254 }
255}