1use std::marker::PhantomData;
8
9#[cfg(feature="dist_mode")]
10use std::sync::Arc;
11
12#[cfg(feature="dist_mode")]
13use std::slice;
14
15#[cfg(feature="dist_mode")]
16use std::mem;
17
18#[cfg(feature="dist_mode")]
19use std::ptr;
20
21#[cfg(feature="dist_mode")]
22use libc::*;
23
24use crate::simulation;
25use crate::simulation::point::Point;
26
27#[cfg(feature="dist_mode")]
28use crate::simulation::error::*;
29
30#[cfg(feature="dist_mode")]
31use crate::simulation::utils::byte_vec::*;
32
33use dvcompute_utils::grc::Grc;
34
35#[inline]
37pub fn return_observer<M, T>(val: T) -> Return<M, T>
38 where T: Clone
39{
40 Return { val: val, _phantom: PhantomData }
41}
42
43#[inline]
45pub fn delay_observer<F, O>(f: F) -> Delay<F, O>
46 where F: Fn() -> O,
47 O: Observer
48{
49 Delay { f: f, _phantom: PhantomData }
50}
51
52#[inline]
54pub fn cons_observer<F, M, T>(f: F) -> Cons<F, M, T>
55 where F: Fn(&M, &Point) -> simulation::Result<T>
56{
57 Cons { f: f, _phantom1: PhantomData, _phantom2: PhantomData }
58}
59
60#[inline]
62pub fn message_observer<M>() -> Message<M>
63 where M: Clone
64{
65 Message { _phantom: PhantomData }
66}
67
68#[inline]
70pub fn trace_observer<O>(msg: String, comp: O) -> Trace<O>
71 where O: Observer
72{
73 Trace { comp: comp, msg: msg}
74}
75
76pub trait Observer {
78
79 type Message;
81
82 type Item;
84
85 #[doc(hidden)]
87 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item>;
88
89 #[inline]
91 fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
92 where Self: Sized,
93 U: Observer<Message = Self::Message>,
94 F: Fn(Self::Item) -> U,
95 {
96 AndThen { comp: self, f: f, _phantom: PhantomData }
97 }
98
99 #[inline]
101 fn map<B, F>(self, f: F) -> Map<Self, B, F>
102 where Self: Sized,
103 F: Fn(Self::Item) -> B,
104 {
105 Map { comp: self, f: f, _phantom: PhantomData }
106 }
107
108 #[inline]
110 fn zip<U>(self, other: U) -> Zip<Self, U>
111 where Self: Sized,
112 U: Observer<Message = Self::Message>
113 {
114 Zip { comp: self, other: other }
115 }
116
117 #[inline]
119 fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
120 where Self: Sized,
121 Self::Item: Fn(U::Item) -> B,
122 U: Observer<Message = Self::Message>
123 {
124 Ap { comp: self, other: other, _phantom: PhantomData }
125 }
126
127 #[inline]
129 fn into_boxed(self) -> ObserverBox<Self::Message, Self::Item>
130 where Self: Sized + Clone + 'static
131 {
132 ObserverBox::new(move |m: &Self::Message, p: &Point| {
133 self.call_observer(m, p)
134 })
135 }
136}
137
138pub trait IntoObserver {
140
141 type Observer: Observer<Message = Self::Message, Item = Self::Item>;
143
144 type Message;
146
147 type Item;
149
150 fn into_observer(self) -> Self::Observer;
152}
153
154impl<M: Observer> IntoObserver for M {
155
156 type Observer = M;
157
158 type Message = M::Message;
159
160 type Item = M::Item;
161
162 #[inline]
163 fn into_observer(self) -> Self::Observer {
164 self
165 }
166}
167
168#[must_use = "computations are lazy and do nothing unless to be run"]
170pub struct ObserverBox<M, T> where M: 'static, T: 'static {
171 f: Grc<Box<dyn Fn(&M, &Point) -> simulation::Result<T>>>
172}
173
174impl<M, T> ObserverBox<M, T> {
175
176 #[doc(hidden)]
178 #[inline]
179 fn new<F>(f: F) -> Self
180 where F: Fn(&M, &Point) -> simulation::Result<T> + Clone + 'static
181 {
182 ObserverBox {
183 f: Grc::new(Box::new(f))
184 }
185 }
186
187 #[doc(hidden)]
189 #[inline]
190 pub fn call(&self, m: &M, p: &Point) -> simulation::Result<T> {
191 let ObserverBox { f } = self;
192 f(m, p)
193 }
194}
195
196impl<M, T> Clone for ObserverBox<M, T> {
197
198 #[inline]
199 fn clone(&self) -> Self {
200 let f = &self.f;
201 ObserverBox {
202 f: f.clone()
203 }
204 }
205}
206
207impl<M, T> Observer for ObserverBox<M, T> {
208
209 type Message = M;
210 type Item = T;
211
212 #[doc(hidden)]
213 #[inline]
214 fn call_observer(&self, m: &M, p: &Point) -> simulation::Result<T> {
215 self.call(m, p)
216 }
217
218 #[inline]
219 fn into_boxed(self) -> ObserverBox<Self::Message, Self::Item>
220 where Self: Sized + Clone + 'static
221 {
222 self
223 }
224}
225
226#[cfg(feature="dist_mode")]
228#[repr(C)]
229#[derive(Copy, Clone)]
230struct ObserverTraitObject {
231
232 field1: *mut c_void,
233 field2: *mut c_void
234}
235
236#[cfg(feature="dist_mode")]
238#[repr(C)]
239pub struct ObserverRepr {
240
241 delete: unsafe extern "C" fn(obj: *mut ObserverTraitObject),
243
244 clone: unsafe extern "C" fn(obj: *const ObserverTraitObject) -> ObserverRepr,
246
247 callback: unsafe extern "C" fn(obj: *const ObserverTraitObject, message: *const u8, count: usize, p: *const Point) -> *mut ErrorRepr,
249
250 trait_object: ObserverTraitObject
252}
253
254#[cfg(feature="dist_mode")]
255impl Drop for ObserverRepr {
256
257 fn drop(&mut self) {
258 unsafe {
259 (self.delete)(&mut self.trait_object);
260 }
261 }
262}
263
264#[cfg(feature="dist_mode")]
265impl Clone for ObserverRepr {
266
267 fn clone(&self) -> Self {
268 unsafe {
269 (self.clone)(&self.trait_object)
270 }
271 }
272}
273
274#[cfg(feature="dist_mode")]
275impl ObserverRepr {
276
277 #[inline]
279 pub fn into_repr(comp: ObserverBox<&[u8], ()>) -> ObserverRepr {
280 unsafe {
281 ObserverRepr {
282 delete: delete_observer_repr,
283 clone: clone_observer_repr,
284 callback: call_observer_repr,
285 trait_object: ObserverTraitObject {
286 field1: mem::transmute(comp),
287 field2: ptr::null_mut()
288 }
289 }
290 }
291 }
292
293 #[inline]
295 fn call_repr(&self, m: &[u8], p: &Point) -> *mut ErrorRepr {
296 unsafe {
297 (self.callback)(&self.trait_object, m.as_ptr(), m.len(), p)
298 }
299 }
300}
301
302#[cfg(feature="dist_mode")]
304unsafe extern "C" fn call_observer_repr(comp: *const ObserverTraitObject, m: *const u8, count: usize, p: *const Point) -> *mut ErrorRepr {
305 let comp: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
306 let m = slice::from_raw_parts(m, count);
307 match comp.call_observer(&m, &*p) {
308 Result::Ok(()) => {
309 mem::forget(comp);
310 ptr::null_mut()
311 },
312 Result::Err(e) => {
313 mem::forget(comp);
314 let e = ErrorRepr::new(e);
315 Box::into_raw(Box::new(e))
316 }
317 }
318}
319
320#[cfg(feature="dist_mode")]
322unsafe extern "C" fn clone_observer_repr(comp: *const ObserverTraitObject) -> ObserverRepr {
323 let comp: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
324 let x = ObserverRepr {
325 delete: delete_observer_repr,
326 clone: clone_observer_repr,
327 callback: call_observer_repr,
328 trait_object: ObserverTraitObject {
329 field1: mem::transmute(comp.clone()),
330 field2: ptr::null_mut()
331 }
332 };
333 mem::forget(comp);
334 x
335}
336
337#[cfg(feature="dist_mode")]
339unsafe extern "C" fn delete_observer_repr(comp: *mut ObserverTraitObject) {
340 let _: ObserverBox<&[u8], ()> = mem::transmute((*comp).field1);
341}
342
343#[cfg(feature="dist_mode")]
344impl Observer for ObserverRepr {
345
346 type Message = Arc<ByteVecRepr>;
347
348 type Item = ();
349
350 #[doc(hidden)]
351 #[inline]
352 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
353 unsafe {
354 let m = m.slice();
355 let e = self.call_repr(m, p);
356 if e == ptr::null_mut() {
357 Result::Ok(())
358 } else {
359 let e = ffi_error_repr_into_error(e);
360 Result::Err(e)
361 }
362 }
363 }
364}
365
366#[must_use = "computations are lazy and do nothing unless to be run"]
368#[derive(Clone)]
369pub struct Return<M, T> {
370
371 val: T,
373
374 _phantom: PhantomData<M>
376}
377
378impl<M, T> Observer for Return<M, T>
379 where T: Clone
380{
381 type Message = M;
382 type Item = T;
383
384 #[doc(hidden)]
385 #[inline]
386 fn call_observer(&self, _: &M, _: &Point) -> simulation::Result<T> {
387 Result::Ok(self.val.clone())
388 }
389}
390
391#[must_use = "computations are lazy and do nothing unless to be run"]
393#[derive(Clone)]
394pub struct Delay<F, O> {
395
396 f: F,
398
399 _phantom: PhantomData<O>
401}
402
403impl<F, O> Observer for Delay<F, O>
404 where F: Fn() -> O,
405 O: Observer
406{
407 type Message = O::Message;
408 type Item = O::Item;
409
410 #[doc(hidden)]
411 #[inline]
412 fn call_observer(&self, m: &O::Message, p: &Point) -> simulation::Result<O::Item> {
413 let Delay { f, _phantom } = self;
414 f().call_observer(m, p)
415 }
416}
417
418#[must_use = "computations are lazy and do nothing unless to be run"]
420#[derive(Clone)]
421pub struct Cons<F, M, T> {
422
423 f: F,
425
426 _phantom1: PhantomData<M>,
428
429 _phantom2: PhantomData<T>
431}
432
433impl<F, M, T> Observer for Cons<F, M, T>
434 where F: Fn(&M, &Point) -> simulation::Result<T>
435{
436 type Message = M;
437 type Item = T;
438
439 #[doc(hidden)]
440 #[inline]
441 fn call_observer(&self, m: &M, p: &Point) -> simulation::Result<T> {
442 let Cons { f, _phantom1, _phantom2 } = self;
443 f(m, p)
444 }
445}
446
447#[must_use = "computations are lazy and do nothing unless to be run"]
449#[derive(Clone)]
450pub struct Message<M> {
451
452 _phantom: PhantomData<M>
454}
455
456impl<M> Observer for Message<M>
457 where M: Clone
458{
459 type Message = M;
460 type Item = M;
461
462 #[doc(hidden)]
463 #[inline]
464 fn call_observer(&self, m: &Self::Message, _: &Point) -> simulation::Result<Self::Item> {
465 Result::Ok(m.clone())
466 }
467}
468
469#[must_use = "computations are lazy and do nothing unless to be run"]
471#[derive(Clone)]
472pub struct AndThen<O, U, F> {
473
474 comp: O,
476
477 f: F,
479
480 _phantom: PhantomData<U>
482}
483
484impl<O, U, F> Observer for AndThen<O, U, F>
485 where O: Observer,
486 U: Observer<Message = O::Message>,
487 F: Fn(O::Item) -> U,
488{
489 type Message = U::Message;
490 type Item = U::Item;
491
492 #[doc(hidden)]
493 #[inline]
494 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
495 let AndThen { comp, f, _phantom } = self;
496 match comp.call_observer(m, p) {
497 Result::Ok(a) => {
498 let u = f(a);
499 u.call_observer(m, p)
500 },
501 Result::Err(e) => {
502 Result::Err(e)
503 }
504 }
505 }
506}
507
508#[must_use = "computations are lazy and do nothing unless to be run"]
510#[derive(Clone)]
511pub struct Map<O, B, F> {
512
513 comp: O,
515
516 f: F,
518
519 _phantom: PhantomData<B>
521}
522
523impl<O, B, F> Observer for Map<O, B, F>
524 where O: Observer,
525 F: Fn(O::Item) -> B,
526{
527 type Message = O::Message;
528 type Item = B;
529
530 #[doc(hidden)]
531 #[inline]
532 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
533 let Map { comp, f, _phantom } = self;
534 match comp.call_observer(m, p) {
535 Result::Ok(a) => Result::Ok(f(a)),
536 Result::Err(e) => Result::Err(e)
537 }
538 }
539}
540
541#[must_use = "computations are lazy and do nothing unless to be run"]
543#[derive(Clone)]
544pub struct Zip<O, U> {
545
546 comp: O,
548
549 other: U,
551}
552
553impl<O, U> Observer for Zip<O, U>
554 where O: Observer,
555 U: Observer<Message = O::Message>
556{
557 type Message = O::Message;
558 type Item = (O::Item, U::Item);
559
560 #[doc(hidden)]
561 #[inline]
562 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
563 let Zip { comp, other } = self;
564 match comp.call_observer(m, p) {
565 Result::Ok(a) => {
566 match other.call_observer(m, p) {
567 Result::Ok(b) => Result::Ok((a, b)),
568 Result::Err(e) => Result::Err(e)
569 }
570 },
571 Result::Err(e) => Result::Err(e)
572 }
573 }
574}
575
576#[must_use = "computations are lazy and do nothing unless to be run"]
578#[derive(Clone)]
579pub struct Ap<O, U, B> {
580
581 comp: O,
583
584 other: U,
586
587 _phantom: PhantomData<B>
589}
590
591impl<O, U, B> Observer for Ap<O, U, B>
592 where O: Observer,
593 U: Observer<Message = O::Message>,
594 O::Item: Fn(U::Item) -> B,
595{
596 type Message = O::Message;
597 type Item = B;
598
599 #[doc(hidden)]
600 #[inline]
601 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
602 let Ap { comp, other, _phantom } = self;
603 match comp.call_observer(m, p) {
604 Result::Ok(f) => {
605 match other.call_observer(m, p) {
606 Result::Ok(a) => Result::Ok(f(a)),
607 Result::Err(e) => Result::Err(e)
608 }
609 },
610 Result::Err(e) => Result::Err(e)
611 }
612 }
613}
614
615#[must_use = "computations are lazy and do nothing unless to be run"]
617#[derive(Clone)]
618pub struct Trace<O> {
619
620 comp: O,
622
623 msg: String
625}
626
627impl<O> Observer for Trace<O>
628 where O: Observer
629{
630 type Message = O::Message;
631 type Item = O::Item;
632
633 #[doc(hidden)]
634 #[inline]
635 fn call_observer(&self, m: &Self::Message, p: &Point) -> simulation::Result<Self::Item> {
636 let Trace { comp, msg } = self;
637 p.trace(&msg);
638 comp.call_observer(m, p)
639 }
640}