1use std::rc::Rc;
8use std::marker::PhantomData;
9use std::ops::Deref;
10
11use crate::simulation;
12use crate::simulation::Point;
13use crate::simulation::error::*;
14use crate::simulation::event::*;
15use crate::simulation::{Run as SimulationRun};
16use crate::simulation::simulation::Simulation;
17use crate::simulation::observable::*;
18use crate::simulation::observable::observer::*;
19use crate::simulation::observable::source::*;
20use crate::simulation::observable::disposable::*;
21use crate::simulation::ref_comp::RefComp;
22
23use dvcompute_utils::grc::Grc;
24
25pub mod random;
27
28pub mod ops;
30
31#[inline]
33pub fn return_process<T>(val: T) -> Return<T> {
34 Return { val: val }
35}
36
37#[inline]
39pub fn panic_process<T>(msg: String) -> Panic<T> {
40 Panic { msg: msg, _phantom: PhantomData }
41}
42
43#[inline]
45pub fn delay_process<F, M>(f: F) -> Delay<F, M>
46 where F: FnOnce() -> M + 'static,
47 M: Process + 'static
48{
49 Delay { f: f, _phantom: PhantomData }
50}
51
52#[inline]
54pub fn process_id() -> Id {
55 Id {}
56}
57
58#[inline]
60pub fn cancel_process<T>() -> Cancel<T> {
61 Cancel { _phantom: PhantomData }
62}
63
64#[inline]
66pub fn hold_process(dt: f64) -> Hold {
67 Hold { dt: dt }
68}
69
70#[inline]
72pub fn passivate_process() -> Passivate {
73 Passivate {}
74}
75
76#[inline]
78pub fn passivate_process_before<M>(comp: M) -> PassivateBefore<M>
79 where M: Event<Item = ()>
80{
81 PassivateBefore { comp: comp }
82}
83
84#[inline]
86pub fn loop_process<F, M>(f: F) -> Loop<F, M>
87 where F: Fn() -> M + 'static,
88 M: Process<Item = ()> + 'static
89{
90 Loop { f: f, _phantom: PhantomData }
91}
92
93#[inline]
96pub fn spawn_process<M>(comp: M) -> Spawn<M>
97 where M: Process<Item = ()> + 'static
98{
99 spawn_process_with(ProcessCancellation::CancelTogether, comp)
100}
101
102#[inline]
105pub fn spawn_process_using_id<M>(pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
106 where M: Process<Item = ()> + 'static
107{
108 spawn_process_using_id_with(ProcessCancellation::CancelTogether, pid, comp)
109}
110
111#[inline]
114pub fn spawn_process_with<M>(cancellation: ProcessCancellation, comp: M) -> Spawn<M>
115 where M: Process<Item = ()> + 'static
116{
117 Spawn { cancellation: cancellation, comp: comp }
118}
119
120#[inline]
123pub fn spawn_process_using_id_with<M>(cancellation: ProcessCancellation, pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
124 where M: Process<Item = ()> + 'static
125{
126 SpawnUsingId { cancellation: cancellation, comp: comp, comp_id: pid }
127}
128
129#[inline]
131pub fn process_await<O>(observable: O) -> Await<O, O::Message>
132 where O: Observable,
133 O::Message: Clone + 'static
134{
135 Await { observable: observable, _phantom: PhantomData }
136}
137
138#[inline]
140pub fn process_await_result<O, M>(observable: O) -> AwaitResult<O, M>
141 where O: Observable<Message = simulation::Result<M>>,
142 M: Clone + 'static
143{
144 AwaitResult { observable: observable, _phantom: PhantomData }
145}
146
147#[inline]
149pub fn timeout_process<M>(timeout: f64, comp: M) -> Timeout<M>
150 where M: Process + 'static,
151 M::Item: Clone
152{
153 Timeout { timeout: timeout, comp: comp }
154}
155
156#[inline]
158pub fn timeout_process_using_id<M>(timeout: f64, pid: Grc<ProcessId>, comp: M) -> TimeoutUsingId<M>
159 where M: Process + 'static,
160 M::Item: Clone
161{
162 TimeoutUsingId { timeout: timeout, comp: comp, comp_id: pid }
163}
164
165#[inline]
167pub fn transfer_process<T, M>(comp: M) -> Transfer<T, M>
168 where M: Process<Item = ()>
169{
170 Transfer { comp: comp, _phantom: PhantomData }
171}
172
173#[inline]
175pub fn never_process<T>() -> Never<T> {
176 Never { _phantom: PhantomData }
177}
178
179#[inline]
181pub fn when_cancelling_process<M>(comp: M) -> WhenCancelling<M>
182 where M: Observer<Message = (), Item = ()> + 'static
183{
184 WhenCancelling { comp: comp }
185}
186
187#[inline]
189pub fn embed_result_process<T>(val: simulation::Result<T>) -> EmbedResult<T> {
190 EmbedResult { val: val }
191}
192
193#[inline]
195pub fn process_sequence<I, M>(comps: I) -> Sequence<I::IntoIter, M>
196 where I: IntoIterator<Item = M> + 'static,
197 M: Process + 'static
198{
199 let comps = comps.into_iter();
200 let acc = {
201 match comps.size_hint() {
202 (_, Some(n)) => Vec::with_capacity(n),
203 (_, None) => Vec::new()
204 }
205 };
206 Sequence { comps: comps, acc: acc }
207}
208
209#[inline]
211pub fn process_sequence_<I, M>(comps: I) -> Sequence_<I::IntoIter, M>
212 where I: IntoIterator<Item = M> + 'static,
213 M: Process + 'static
214{
215 Sequence_ { comps: comps.into_iter(), _phantom: PhantomData }
216}
217
218#[inline]
220pub fn process_with_priority(priority: isize) -> ProcessWithPriority {
221 ProcessWithPriority { priority: priority }
222}
223
224#[inline]
226pub fn restore_process_priority<M>(priority: isize, comp: M) -> impl Process<Item = M::Item>
227 where M: Process + 'static
228{
229 process_with_priority(priority).and_then(|()| { comp })
230}
231
232#[inline]
234pub fn trace_process<M>(msg: String, comp: M) -> Trace<M>
235 where M: Process
236{
237 Trace { comp: comp, msg: msg}
238}
239
240#[doc(hidden)]
242#[inline]
243pub fn is_process_cancelled(pid: &ProcessId, p: &Point) -> bool {
244 pid.is_cancel_activated(p)
245}
246
247#[doc(hidden)]
249pub fn revoke_process<T, C>(cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
250 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
251{
252 pid.deactivate_cancel(p);
253 cont(Result::Err(Error::Cancel), pid, p)
254}
255
256#[doc(hidden)]
258pub fn revoke_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
259 pid.deactivate_cancel(p);
260 cont.call_box((Result::Err(Error::Cancel), pid, p))
261}
262
263#[doc(hidden)]
265pub fn cut_error_process<T, C>(_cont: C, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()>
266 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
267{
268 Result::Err(Error::Other(err))
269}
270
271#[doc(hidden)]
273pub fn cut_error_process_boxed<T>(_cont: ProcessBoxCont<T>, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()> {
274 Result::Err(Error::Other(err))
275}
276
277#[doc(hidden)]
279pub fn propagate_error_process<T, C>(cont: C, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()>
280 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
281{
282 cont(Result::Err(Error::Other(err)), pid, p)
283}
284
285#[doc(hidden)]
287pub fn propagate_error_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()> {
288 cont.call_box((Result::Err(Error::Other(err)), pid, p))
289}
290
291#[doc(hidden)]
293#[inline]
294pub fn resume_process<T, C>(cont: C, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()>
295 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
296{
297 if is_process_cancelled(&pid, p) {
298 revoke_process(cont, pid, p)
299 } else {
300 cont(Result::Ok(t), pid, p)
301 }
302}
303
304#[doc(hidden)]
306#[inline]
307pub fn resume_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()> {
308 if is_process_cancelled(&pid, p) {
309 revoke_process_boxed(cont, pid, p)
310 } else {
311 cont.call_box((Result::Ok(t), pid, p))
312 }
313}
314
315#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
317pub enum ProcessCancellation {
318
319 CancelTogether,
321
322 CancelChildAfterParent,
324
325 CancelParentAfterChild,
327
328 CancelInIsolation
330}
331
332pub struct ProcessId {
334
335 cancel_initiated: RefComp<bool>,
337
338 cancel_activated: RefComp<bool>,
340
341 preemption_count: RefComp<isize>,
343
344 source: ObservableSource<ProcessEvent>,
346
347 started: RefComp<bool>,
349
350 react_flag: RefComp<Option<bool>>,
352
353 react_cont: RefComp<Option<ProcessBoxCont<()>>>,
355
356 react_priority: RefComp<isize>,
358
359 interrupt_flag: RefComp<Option<bool>>,
361
362 interrupt_cont: RefComp<Option<ProcessBoxCont<()>>>,
364
365 interrupt_time: RefComp<f64>,
367
368 interrupt_ver: RefComp<i64>,
370
371 interrupt_priority: RefComp<isize>
373}
374
375impl ProcessId {
376
377 pub fn new() -> NewProcessId {
379 NewProcessId {}
380 }
381
382 fn prepare(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
384 let y = pid.started.read_at(p);
385 if y {
386 panic!("Another process with the specified identifier has been started already")
387 } else {
388 pid.started.write_at(true, p)
389 }
390 let disposable = {
391 let observable = pid.observable();
392 let pid = Grc::downgrade(&pid);
393 observable
394 .subscribe(cons_observer(move |e, p| {
395 match pid.upgrade() {
396 None => {
397 let msg = String::from("It should not happen");
398 let err = Error::retry(msg);
399 Result::Err(err)
400 },
401 Some(pid) => {
402 match e {
403 ProcessEvent::CancelInitiating => {
404 if pid.is_cancel_activated(p) {
405 match ProcessId::interrupt(pid.clone()).call_event(p) {
406 Result::Ok(()) => ProcessId::reactivate(pid).call_event(p),
407 Result::Err(e) => Result::Err(e)
408 }
409 } else {
410 Result::Ok(())
411 }
412 },
413 ProcessEvent::PreemptionInitiating => {
414 ProcessId::on_preempted_at(pid, p)
415 },
416 ProcessEvent::PreemptionEnding => {
417 Result::Ok(())
418 }
419 }
420 }
421 }
422 })).call_event(p)
423 };
424 match disposable {
425 Result::Ok(_) => Result::Ok(()),
426 Result::Err(e) => Result::Err(e)
427 }
428 }
429
430 #[inline]
432 pub fn observable(&self) -> Publish<ProcessEvent> {
433 self.source.publish()
434 }
435
436 #[inline]
438 pub fn cancel_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
439 self.observable().filter(|m| { *m == ProcessEvent::CancelInitiating })
440 }
441
442 #[doc(hidden)]
444 #[inline]
445 pub fn is_cancel_initiated_at(&self, p: &Point) -> bool {
446 self.cancel_initiated.read_at(p)
447 }
448
449 #[inline]
451 pub fn is_cancel_initiated(pid: Grc<ProcessId>) -> impl Event<Item = bool> + Clone {
452 cons_event(move |p| Result::Ok(pid.is_cancel_initiated_at(p)))
453 }
454
455 #[inline]
457 fn is_cancel_activated(&self, p: &Point) -> bool {
458 self.cancel_activated.read_at(p)
459 }
460
461 #[inline]
463 fn deactivate_cancel(&self, p: &Point) {
464 self.cancel_activated.write_at(false, p);
465 }
466
467 #[inline]
469 pub fn cancel(pid: Grc<ProcessId>) -> CancelById {
470 CancelById { pid: pid }
471 }
472
473 fn _bind_cancel(this: Grc<ProcessId>, others: Vec<Grc<ProcessId>>, p: &Point) -> simulation::Result<DisposableBox> {
475 let this2 = this.clone();
476 let this1 = this;
477 let others2 = {
478 others.iter().map(|pid| pid.clone()).collect::<Vec<_>>()
479 };
480 let others1 = others;
481 let hs1 = {
482 others1.iter().map(|pid| {
483 let this1 = Grc::downgrade(&this1);
484 pid.cancel_initiating()
485 .subscribe(cons_observer(move |_, p| {
486 match this1.upgrade() {
487 None => Result::Ok(()),
488 Some(this1) => this1.initiate_cancel_at(p)
489 }
490 }))
491 })
492 };
493 let hs1 = event_sequence(hs1).call_event(p);
494 match hs1 {
495 Result::Err(e) => Result::Err(e),
496 Result::Ok(hs1) => {
497 let hs2 = {
498 others2.iter().map(|pid| {
499 let pid = Grc::downgrade(&pid);
500 this2.cancel_initiating()
501 .subscribe(cons_observer(move |_, p| {
502 match pid.upgrade() {
503 None => Result::Ok(()),
504 Some(pid) => pid.initiate_cancel_at(p)
505 }
506 }))
507 })
508 };
509 let hs2 = event_sequence(hs2).call_event(p);
510 match hs2 {
511 Result::Err(e) => Result::Err(e),
512 Result::Ok(hs2) => {
513 let hs1 = concat_disposables(hs1.into_iter());
514 let hs2 = concat_disposables(hs2.into_iter());
515
516 Result::Ok(hs1.merge(hs2).into_boxed())
517 }
518 }
519 }
520 }
521 }
522
523 fn connect_cancel(parent: Grc<ProcessId>, cancellation: ProcessCancellation, child: Grc<ProcessId>, p: &Point) -> simulation::Result<DisposableBox> {
525 let h1 = match cancellation {
526 ProcessCancellation::CancelTogether |
527 ProcessCancellation::CancelChildAfterParent => {
528 let child = Grc::downgrade(&child);
529 parent.cancel_initiating()
530 .subscribe(cons_observer(move |_, p| {
531 match child.upgrade() {
532 None => Result::Ok(()),
533 Some(child) => child.initiate_cancel_at(p)
534 }
535 }))
536 .call_event(p)
537 },
538 ProcessCancellation::CancelParentAfterChild |
539 ProcessCancellation::CancelInIsolation => {
540 Result::Ok(empty_disposable().into_boxed())
541 }
542 };
543 match h1 {
544 Result::Err(e) => Result::Err(e),
545 Result::Ok(h1) => {
546 let h2 = match cancellation {
547 ProcessCancellation::CancelTogether |
548 ProcessCancellation::CancelParentAfterChild => {
549 let parent = Grc::downgrade(&parent);
550 child.cancel_initiating()
551 .subscribe(cons_observer(move |_, p| {
552 match parent.upgrade() {
553 None => Result::Ok(()),
554 Some(parent) => parent.initiate_cancel_at(p)
555 }
556 }))
557 .call_event(p)
558 },
559 ProcessCancellation::CancelChildAfterParent |
560 ProcessCancellation::CancelInIsolation => {
561 Result::Ok(empty_disposable().into_boxed())
562 }
563 };
564 match h2 {
565 Result::Err(e) => {
566 h1.call_box((p,))?;
567 Result::Err(e)
568 },
569 Result::Ok(h2) => Result::Ok(h1.merge(h2).into_boxed())
570 }
571 }
572 }
573 }
574
575 #[doc(hidden)]
577 pub fn initiate_cancel_at(&self, p: &Point) -> simulation::Result<()> {
578 let f = self.cancel_initiated.read_at(p);
579 if !f {
580 self.cancel_initiated.write_at(true, p);
581 self.cancel_activated.write_at(true, p);
582 self.source.trigger_at(&ProcessEvent::CancelInitiating, p)
583 } else {
584 Result::Ok(())
585 }
586 }
587
588 #[doc(hidden)]
590 pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
591 let f = self.cancel_initiated.read_at(p);
592 if !f {
593 let n = self.preemption_count.read_at(p);
594 self.preemption_count.write_at(n + 1, p);
595 if n == 0 {
596 self.source.trigger_at(&ProcessEvent::PreemptionInitiating, p)
597 } else {
598 Result::Ok(())
599 }
600 } else {
601 Result::Ok(())
602 }
603 }
604
605 #[doc(hidden)]
607 pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
608 let f = self.cancel_initiated.read_at(p);
609 if !f {
610 let n = self.preemption_count.read_at(p);
611 self.preemption_count.write_at(n - 1, p);
612 if n - 1 == 0 {
613 self.source.trigger_at(&ProcessEvent::PreemptionEnding, p)
614 } else {
615 Result::Ok(())
616 }
617 } else {
618 Result::Ok(())
619 }
620 }
621
622 #[inline]
624 pub fn begin_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
625 cons_event(move |p| pid.begin_preemption_at(p))
626 }
627
628 #[inline]
630 pub fn end_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
631 cons_event(move |p| pid.end_preemption_at(p))
632 }
633
634 #[inline]
636 pub fn preemption_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
637 self.observable().filter(|m| { *m == ProcessEvent::PreemptionInitiating })
638 }
639
640 #[inline]
642 pub fn preemption_ending(&self) -> impl Observable<Message = ProcessEvent> + Clone {
643 self.observable().filter(|m| { *m == ProcessEvent::PreemptionEnding })
644 }
645
646 #[inline]
648 fn is_preempted(&self, p: &Point) -> bool {
649 self.preemption_count.read_at(p) > 0
650 }
651
652 #[inline]
655 pub fn interrupt(pid: Grc<ProcessId>) -> Interrupt {
656 Interrupt { pid: pid }
657 }
658
659 #[inline]
662 pub fn is_interrupted(pid: Grc<ProcessId>) -> IsInterrupted {
663 IsInterrupted { pid: pid }
664 }
665
666 #[inline]
670 pub fn interruption_time(pid: Grc<ProcessId>) -> InterruptionTime {
671 InterruptionTime { pid: pid }
672 }
673
674 #[inline]
677 pub fn is_passivated(pid: Grc<ProcessId>) -> IsPassivated {
678 IsPassivated { pid: pid }
679 }
680
681 #[inline]
683 pub fn reactivate(pid: Grc<ProcessId>) -> Reactivate {
684 Reactivate { pid: pid }
685 }
686
687 #[inline]
689 pub fn reactivate_immediately(pid: Grc<ProcessId>) -> ReactivateImmediately {
690 ReactivateImmediately { pid: pid }
691 }
692
693 #[inline]
695 pub fn reactivate_many_immediately<I>(pids: I) -> ReactivateManyImmediately<I::IntoIter>
696 where I: IntoIterator<Item = Grc<ProcessId>> + 'static
697 {
698 ReactivateManyImmediately { pids: pids.into_iter() }
699 }
700
701 fn on_preempted_at(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
703 match pid.interrupt_cont.swap_at(None, p) {
704 Some(cont) => {
705 let t = pid.interrupt_time.read_at(p);
706 let dt = t - p.time;
707 let v = pid.interrupt_ver.read_at(p);
708 let priority = pid.interrupt_priority.read_at(p);
709 pid.interrupt_flag.write_at(Some(true), p);
710 pid.interrupt_ver.write_at(v.wrapping_add(1), p);
711 let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
712 restore_process_priority(priority, hold_process(dt))
713 .call_process_boxed(cont, pid, p)
714 });
715 reenter_process(cont, pid, (), p)
716 },
717 None => {
718 match pid.react_cont.swap_at(None, p) {
719 None => Result::Ok(()),
720 Some(cont) => {
721 let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
722 reenter_process(cont, pid, (), p)
723 });
724 pid.react_cont.write_at(Some(cont), p);
725 Result::Ok(())
726 }
727 }
728 }
729 }
730 }
731}
732
733impl PartialEq for ProcessId {
734
735 fn eq(&self, other: &Self) -> bool {
736 self.started == other.started
737 }
738}
739
740impl Eq for ProcessId {}
741
742#[must_use = "computations are lazy and do nothing unless to be run"]
744#[derive(Clone)]
745pub struct NewProcessId {}
746
747impl Simulation for NewProcessId {
748
749 type Item = ProcessId;
750
751 #[doc(hidden)]
752 fn call_simulation(self, r: &SimulationRun) -> simulation::Result<Self::Item> {
753 let specs = &r.specs;
754 Result::Ok(ProcessId {
755 cancel_initiated: RefComp::new(false),
756 cancel_activated: RefComp::new(false),
757 preemption_count: RefComp::new(0),
758 source: ObservableSource::new(),
759 started: RefComp::new(false),
760 react_flag: RefComp::new(None),
761 react_cont: RefComp::new(None),
762 react_priority: RefComp::new(0),
763 interrupt_flag: RefComp::new(None),
764 interrupt_cont: RefComp::new(None),
765 interrupt_time: RefComp::new(specs.start_time),
766 interrupt_ver: RefComp::new(0),
767 interrupt_priority: RefComp::new(0)
768 })
769 }
770}
771
772#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
774pub enum ProcessEvent {
775
776 CancelInitiating,
778
779 PreemptionInitiating,
781
782 PreemptionEnding
784}
785
786pub trait Process {
788
789 type Item;
791
792 #[doc(hidden)]
794 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
795 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static;
796
797 #[doc(hidden)]
799 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>;
800
801 #[inline]
803 fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
804 where Self: Sized + 'static,
805 U: Process + 'static,
806 F: FnOnce(Self::Item) -> U + 'static,
807 {
808 AndThen { comp: self, f: f, _phantom: PhantomData }
809 }
810
811 #[inline]
813 fn map<B, F>(self, f: F) -> Map<Self, B, F>
814 where Self: Sized + 'static,
815 F: FnOnce(Self::Item) -> B + 'static,
816 {
817 Map { comp: self, f: f, _phantom: PhantomData }
818 }
819
820 #[inline]
822 fn zip<U>(self, other: U) -> Zip<Self, U>
823 where Self: Sized + 'static,
824 U: Process + 'static
825 {
826 Zip { comp: self, other: other }
827 }
828
829 #[inline]
831 fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
832 where Self: Sized + 'static,
833 Self::Item: FnOnce(U::Item) -> B,
834 U: Process + 'static,
835 B: 'static
836 {
837 Ap { comp: self, other: other, _phantom: PhantomData }
838 }
839
840 #[inline]
842 fn finally<U>(self, finalization: U) -> Finally<Self, U>
843 where Self: Sized + 'static,
844 U: Process<Item = ()> + 'static
845 {
846 Finally { comp: self, finalization: finalization }
847 }
848
849 #[inline]
851 fn run(self) -> Run<Self>
852 where Self: Process<Item = ()> + Sized
853 {
854 Run { comp: self, pid: None }
855 }
856
857 #[inline]
859 fn run_using_id(self, pid: Grc<ProcessId>) -> Run<Self>
860 where Self: Process<Item = ()> + Sized
861 {
862 Run { comp: self, pid: Some(pid) }
863 }
864
865 #[inline]
867 fn into_boxed(self) -> ProcessBox<Self::Item>
868 where Self: Sized + 'static
869 {
870 ProcessBox::new(move |cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point| {
871 self.call_process_boxed(cont, pid, p)
872 })
873 }
874}
875
876pub trait IntoProcess {
878
879 type Process: Process<Item = Self::Item>;
881
882 type Item;
884
885 fn into_process(self) -> Self::Process;
887}
888
889impl<M: Process> IntoProcess for M {
890
891 type Process = M;
892
893 type Item = M::Item;
894
895 #[inline]
896 fn into_process(self) -> Self::Process {
897 self
898 }
899}
900
901#[doc(hidden)]
903pub struct ProcessBoxCont<T> {
904 f: Box<dyn ProcessContFnBox<T>>
905}
906
907#[doc(hidden)]
908impl<T> ProcessBoxCont<T> {
909
910 #[inline]
911 pub fn new<F>(f: F) -> Self
912 where F: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
913 {
914 ProcessBoxCont { f: Box::new(f) }
915 }
916
917 #[inline]
918 pub fn call_box(self, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
919 self.f.call_box(args)
920 }
921}
922
923trait ProcessContFnBox<T> {
925
926 fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
928}
929
930impl<T, F> ProcessContFnBox<T> for F
931 where F: for<'a> FnOnce(simulation::Result<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
932{
933 fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
934 let this: Self = *self;
935 this(args.0, args.1, args.2)
936 }
937}
938
939#[must_use = "computations are lazy and do nothing unless to be run"]
941pub struct ProcessBox<T> {
942 f: Box<dyn ProcessFnBox<T>>
943}
944
945#[doc(hidden)]
946impl<T> ProcessBox<T> {
947
948 #[inline]
949 pub fn new<F>(f: F) -> Self
950 where F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
951 {
952 ProcessBox { f: Box::new(f) }
953 }
954
955 #[inline]
956 pub fn call_box(self, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
957 self.f.call_box(args)
958 }
959}
960
961impl<T> Process for ProcessBox<T> {
962
963 type Item = T;
964
965 #[doc(hidden)]
966 #[inline]
967 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
968 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
969 {
970 let cont = ProcessBoxCont::new(cont);
971 self.call_box((cont, pid, p))
972 }
973
974 #[doc(hidden)]
975 #[inline]
976 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
977 self.call_box((cont, pid, p))
978 }
979
980 #[inline]
981 fn into_boxed(self) -> ProcessBox<Self::Item>
982 where Self: Sized + 'static
983 {
984 self
985 }
986}
987
988trait ProcessFnBox<T> {
990
991 fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
993}
994
995impl<T, F> ProcessFnBox<T> for F
996 where F: for<'a> FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
997{
998 fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
999 let this: Self = *self;
1000 this(args.0, args.1, args.2)
1001 }
1002}
1003
1004#[must_use = "computations are lazy and do nothing unless to be run"]
1006#[derive(Clone)]
1007pub struct Return<T> {
1008
1009 val: T
1011}
1012
1013impl<T> Process for Return<T> {
1014
1015 type Item = T;
1016
1017 #[doc(hidden)]
1018 #[inline]
1019 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1020 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1021 {
1022 if is_process_cancelled(&pid, p) {
1023 revoke_process(cont, pid, p)
1024 } else {
1025 let Return { val } = self;
1026 cont(Result::Ok(val), pid, p)
1027 }
1028 }
1029
1030 #[doc(hidden)]
1031 #[inline]
1032 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1033 if is_process_cancelled(&pid, p) {
1034 revoke_process_boxed(cont, pid, p)
1035 } else {
1036 let Return { val } = self;
1037 cont.call_box((Result::Ok(val), pid, p))
1038 }
1039 }
1040}
1041
1042#[must_use = "computations are lazy and do nothing unless to be run"]
1044#[derive(Clone)]
1045pub struct Delay<F, M> {
1046
1047 f: F,
1049
1050 _phantom: PhantomData<M>
1052}
1053
1054impl<F, M> Process for Delay<F, M>
1055 where F: FnOnce() -> M + 'static,
1056 M: Process + 'static
1057{
1058 type Item = M::Item;
1059
1060 #[doc(hidden)]
1061 #[inline]
1062 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1063 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1064 {
1065 let Delay { f, _phantom } = self;
1066 return_process(()).and_then(move |_| {
1067 f()
1068 }).call_process(cont, pid, p)
1069 }
1070
1071 #[doc(hidden)]
1072 #[inline]
1073 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1074 let Delay { f, _phantom } = self;
1075 return_process(()).and_then(move |_| {
1076 f()
1077 }).call_process_boxed(cont, pid, p)
1078 }
1079}
1080
1081#[must_use = "computations are lazy and do nothing unless to be run"]
1083#[derive(Clone)]
1084pub struct AndThen<M, U, F> {
1085
1086 comp: M,
1088
1089 f: F,
1091
1092 _phantom: PhantomData<U>
1094}
1095
1096impl<M, U, F> Process for AndThen<M, U, F>
1097 where F: FnOnce(M::Item) -> U + 'static,
1098 M: Process + 'static,
1099 U: Process + 'static
1100{
1101 type Item = U::Item;
1102
1103 #[doc(hidden)]
1104 #[inline]
1105 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1106 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1107 {
1108 if is_process_cancelled(&pid, p) {
1111 revoke_process(cont, pid, p)
1112 } else {
1113 let AndThen { comp, f, _phantom } = self;
1114 let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1115 match a {
1116 Result::Ok(a) => {
1117 f(a).call_process(cont, pid, p)
1118 },
1119 Result::Err(Error::Cancel) => {
1120 revoke_process(cont, pid, p)
1121 },
1122 Result::Err(Error::Other(e)) => {
1123 match e.deref() {
1124 &OtherError::Retry(_) => {
1125 cut_error_process(cont, pid, e, p)
1126 },
1127 &OtherError::Panic(_) => {
1128 cut_error_process(cont, pid, e, p)
1129 },
1130 &OtherError::IO(_) => {
1131 propagate_error_process(cont, pid, e, p)
1132 }
1133 }
1134 }
1135 }
1136 };
1137 comp.call_process(cont, pid, p)
1138 }
1139 }
1140
1141 #[doc(hidden)]
1142 #[inline]
1143 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1144 if is_process_cancelled(&pid, p) {
1147 revoke_process_boxed(cont, pid, p)
1148 } else {
1149 let AndThen { comp, f, _phantom } = self;
1150 let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1151 match a {
1152 Result::Ok(a) => {
1153 f(a).call_process_boxed(cont, pid, p)
1154 },
1155 Result::Err(Error::Cancel) => {
1156 revoke_process_boxed(cont, pid, p)
1157 },
1158 Result::Err(Error::Other(e)) => {
1159 match e.deref() {
1160 &OtherError::Retry(_) => {
1161 cut_error_process_boxed(cont, pid, e, p)
1162 },
1163 &OtherError::Panic(_) => {
1164 cut_error_process_boxed(cont, pid, e, p)
1165 },
1166 &OtherError::IO(_) => {
1167 propagate_error_process_boxed(cont, pid, e, p)
1168 }
1169 }
1170 }
1171 }
1172 };
1173 comp.call_process(cont, pid, p)
1174 }
1175 }
1176}
1177
1178#[must_use = "computations are lazy and do nothing unless to be run"]
1180#[derive(Clone)]
1181pub struct Finally<M, U> {
1182
1183 comp: M,
1185
1186 finalization: U
1188}
1189
1190impl<M, U> Process for Finally<M, U>
1191 where M: Process + 'static,
1192 U: Process<Item = ()> + 'static
1193{
1194 type Item = M::Item;
1195
1196 #[doc(hidden)]
1197 #[inline]
1198 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1199 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1200 {
1201 if is_process_cancelled(&pid, p) {
1202 revoke_process(cont, pid, p)
1203 } else {
1204 let Finally { comp, finalization } = self;
1205 let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1206 let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1207 match a0 {
1208 Result::Ok(()) => cont(a, pid, p),
1209 Result::Err(e0) => {
1210 match a {
1211 Result::Ok(_) => cont(Result::Err(e0), pid, p),
1212 Result::Err(e) => cont(Result::Err(e.merge(&e0)), pid, p)
1213 }
1214 }
1215 }
1216 };
1217 finalization.call_process(cont, pid, p)
1218 };
1219 comp.call_process(cont, pid, p)
1220 }
1221 }
1222
1223 #[doc(hidden)]
1224 #[inline]
1225 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1226 if is_process_cancelled(&pid, p) {
1227 revoke_process_boxed(cont, pid, p)
1228 } else {
1229 let Finally { comp, finalization } = self;
1230 let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1231 let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1232 match a0 {
1233 Result::Ok(()) => cont.call_box((a, pid, p)),
1234 Result::Err(e0) => {
1235 match a {
1236 Result::Ok(_) => cont.call_box((Result::Err(e0), pid, p)),
1237 Result::Err(e) => cont.call_box((Result::Err(e.merge(&e0)), pid, p))
1238 }
1239 }
1240 }
1241 };
1242 finalization.call_process(cont, pid, p)
1243 };
1244 comp.call_process(cont, pid, p)
1245 }
1246 }
1247}
1248
1249#[must_use = "computations are lazy and do nothing unless to be run"]
1251#[derive(Clone)]
1252pub struct Map<M, B, F> {
1253
1254 comp: M,
1256
1257 f: F,
1259
1260 _phantom: PhantomData<B>
1262}
1263
1264impl<M, B, F> Process for Map<M, B, F>
1265 where F: FnOnce(M::Item) -> B + 'static,
1266 M: Process + 'static,
1267 B: 'static
1268{
1269 type Item = B;
1270
1271 #[doc(hidden)]
1272 #[inline]
1273 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1274 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1275 {
1276 let Map { comp, f, _phantom } = self;
1277 comp.and_then(move |a| {
1278 return_process(f(a))
1279 }).call_process(cont, pid, p)
1280 }
1281
1282 #[doc(hidden)]
1283 #[inline]
1284 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1285 let Map { comp, f, _phantom } = self;
1286 comp.and_then(move |a| {
1287 return_process(f(a))
1288 }).call_process_boxed(cont, pid, p)
1289 }
1290}
1291
1292#[must_use = "computations are lazy and do nothing unless to be run"]
1294#[derive(Clone)]
1295pub struct Zip<M, U> {
1296
1297 comp: M,
1299
1300 other: U,
1302}
1303
1304impl<M, U> Process for Zip<M, U>
1305 where M: Process + 'static,
1306 U: Process + 'static
1307{
1308 type Item = (M::Item, U::Item);
1309
1310 #[doc(hidden)]
1311 #[inline]
1312 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1313 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1314 {
1315 let Zip { comp, other } = self;
1316 comp.and_then(move |a| {
1317 other.and_then(move |b| {
1318 return_process((a, b))
1319 })
1320 }).call_process(cont, pid, p)
1321 }
1322
1323 #[doc(hidden)]
1324 #[inline]
1325 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1326 let Zip { comp, other } = self;
1327 comp.and_then(move |a| {
1328 other.and_then(move |b| {
1329 return_process((a, b))
1330 })
1331 }).call_process_boxed(cont, pid, p)
1332 }
1333}
1334
1335#[must_use = "computations are lazy and do nothing unless to be run"]
1337#[derive(Clone)]
1338pub struct Ap<M, U, B> {
1339
1340 comp: M,
1342
1343 other: U,
1345
1346 _phantom: PhantomData<B>
1348}
1349
1350impl<M, U, B> Process for Ap<M, U, B>
1351 where M: Process + 'static,
1352 U: Process + 'static,
1353 M::Item: FnOnce(U::Item) -> B,
1354 B: 'static
1355{
1356 type Item = B;
1357
1358 #[doc(hidden)]
1359 #[inline]
1360 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1361 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1362 {
1363 let Ap { comp, other, _phantom } = self;
1364 comp.and_then(move |f| {
1365 other.and_then(move |a| {
1366 return_process(f(a))
1367 })
1368 }).call_process(cont, pid, p)
1369 }
1370
1371 #[doc(hidden)]
1372 #[inline]
1373 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1374 let Ap { comp, other, _phantom } = self;
1375 comp.and_then(move |f| {
1376 other.and_then(move |a| {
1377 return_process(f(a))
1378 })
1379 }).call_process_boxed(cont, pid, p)
1380 }
1381}
1382
1383#[must_use = "computations are lazy and do nothing unless to be run"]
1385pub struct Run<M> {
1386
1387 comp: M,
1389
1390 pid: Option<Grc<ProcessId>>
1392}
1393
1394impl<M> Event for Run<M>
1395 where M: Process<Item = ()>
1396{
1397 type Item = M::Item;
1398
1399 #[doc(hidden)]
1400 #[inline]
1401 fn call_event(self, p: &Point) -> simulation::Result<M::Item> {
1402 let Run { comp, pid } = self;
1403 let pid = {
1404 match pid {
1405 Some(pid) => pid,
1406 None => {
1407 match ProcessId::new().into_event().call_event(p) {
1408 Result::Ok(pid) => Grc::new(pid),
1409 Result::Err(e) => return Result::Err(e)
1410 }
1411 }
1412 }
1413 };
1414 let cont = &initial_cont;
1415 match ProcessId::prepare(pid.clone(), p) {
1416 Result::Err(e) => Result::Err(e),
1417 Result::Ok(()) => comp.call_process(cont, pid, p)
1418 }
1419 }
1420}
1421
1422fn initial_cont(_a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
1424 Result::Ok(())
1425}
1426
1427#[must_use = "computations are lazy and do nothing unless to be run"]
1429#[derive(Clone)]
1430pub struct Id {}
1431
1432impl Process for Id {
1433
1434 type Item = Grc<ProcessId>;
1435
1436 #[doc(hidden)]
1437 #[inline]
1438 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1439 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1440 {
1441 cont(Result::Ok(pid.clone()), pid, p)
1442 }
1443
1444 #[doc(hidden)]
1445 #[inline]
1446 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1447 cont.call_box((Result::Ok(pid.clone()), pid, p))
1448 }
1449}
1450
1451#[must_use = "computations are lazy and do nothing unless to be run"]
1453#[derive(Clone)]
1454pub struct CancelById {
1455
1456 pid: Grc<ProcessId>
1458}
1459
1460impl Event for CancelById {
1461
1462 type Item = ();
1463
1464 #[doc(hidden)]
1465 #[inline]
1466 fn call_event(self, p: &Point) -> simulation::Result<()> {
1467 let CancelById { pid } = self;
1468 pid.initiate_cancel_at(p)
1469 }
1470}
1471
1472#[must_use = "computations are lazy and do nothing unless to be run"]
1474#[derive(Clone)]
1475pub struct Cancel<T> {
1476
1477 _phantom: PhantomData<T>
1479}
1480
1481impl<T> Process for Cancel<T> {
1482
1483 type Item = T;
1484
1485 #[doc(hidden)]
1486 #[inline]
1487 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1488 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1489 {
1490 match pid.initiate_cancel_at(p) {
1491 Result::Ok(()) => {
1492 if is_process_cancelled(&pid, p) {
1493 revoke_process(cont, pid, p)
1494 } else {
1495 Result::Ok(())
1496 }
1497 },
1498 Result::Err(e) => Result::Err(e)
1499 }
1500 }
1501
1502 #[doc(hidden)]
1503 #[inline]
1504 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1505 match pid.initiate_cancel_at(p) {
1506 Result::Ok(()) => {
1507 if is_process_cancelled(&pid, p) {
1508 revoke_process_boxed(cont, pid, p)
1509 } else {
1510 Result::Ok(())
1511 }
1512 },
1513 Result::Err(e) => Result::Err(e)
1514 }
1515 }
1516}
1517
1518#[must_use = "computations are lazy and do nothing unless to be run"]
1520#[derive(Clone)]
1521pub struct Hold {
1522
1523 dt: f64
1525}
1526
1527impl Process for Hold {
1528
1529 type Item = ();
1530
1531 #[doc(hidden)]
1532 #[inline]
1533 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1534 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1535 {
1536 let cont = ProcessBoxCont::new(cont);
1537 self.call_process_boxed(cont, pid, p)
1538 }
1539
1540 #[doc(hidden)]
1541 #[inline]
1542 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1543 let Hold { dt } = self;
1544 let t = p.time + dt;
1545 pid.interrupt_cont.write_at(Some(cont), p);
1546 pid.interrupt_flag.write_at(Some(false), p);
1547 pid.interrupt_time.write_at(t, p);
1548 pid.interrupt_priority.write_at(p.priority, p);
1549 let v = pid.interrupt_ver.read_at(p);
1550 enqueue_event(t, {
1551 cons_event(move |p| {
1552 if p.priority >= p.minimal_priority {
1553 let v2 = pid.interrupt_ver.read_at(p);
1554 if v == v2 {
1555 pid.interrupt_flag.write_at(None, p);
1556 let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1557 resume_process_boxed(cont, pid, (), p)
1558 } else {
1559 Result::Ok(())
1560 }
1561
1562 } else {
1563 pid.initiate_cancel_at(p)?;
1564 if is_process_cancelled(&pid, p) {
1565 let v2 = pid.interrupt_ver.read_at(p);
1566 if v == v2 {
1567 pid.interrupt_flag.write_at(None, p);
1568 let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1569 revoke_process_boxed(cont, pid, p)
1570 } else {
1571 Result::Ok(())
1572 }
1573 } else {
1574 Result::Ok(())
1575 }
1576 }
1577 }).into_boxed()
1578 }).call_event(p)
1579 }
1580}
1581
1582#[must_use = "computations are lazy and do nothing unless to be run"]
1585#[derive(Clone)]
1586pub struct Interrupt {
1587
1588 pid: Grc<ProcessId>
1590}
1591
1592impl Event for Interrupt {
1593
1594 type Item = ();
1595
1596 #[doc(hidden)]
1597 #[inline]
1598 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1599 let Interrupt { pid } = self;
1600 match pid.interrupt_cont.swap_at(None, p) {
1601 None => Result::Ok(()),
1602 Some(cont) => {
1603 let v = pid.interrupt_ver.read_at(p);
1604 let priority = pid.interrupt_priority.read_at(p);
1605 pid.interrupt_ver.write_at(v.wrapping_add(1), p);
1606 pid.interrupt_flag.write_at(Some(true), p);
1607 enqueue_event_with_priority(p.time, priority, {
1608 cons_event(move |p| {
1609 resume_process_boxed(cont, pid, (), p)
1610 }).into_boxed()
1611 }).call_event(p)
1612 }
1613 }
1614 }
1615}
1616
1617#[must_use = "computations are lazy and do nothing unless to be run"]
1619#[derive(Clone)]
1620pub struct IsInterrupted {
1621
1622 pid: Grc<ProcessId>
1624}
1625
1626impl Event for IsInterrupted {
1627
1628 type Item = bool;
1629
1630 #[doc(hidden)]
1631 #[inline]
1632 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1633 let IsInterrupted { pid } = self;
1634 match pid.interrupt_flag.read_at(p) {
1635 Some(true) => Result::Ok(true),
1636 _ => Result::Ok(false)
1637 }
1638 }
1639}
1640
1641#[must_use = "computations are lazy and do nothing unless to be run"]
1644#[derive(Clone)]
1645pub struct InterruptionTime {
1646
1647 pid: Grc<ProcessId>
1649}
1650
1651impl Event for InterruptionTime {
1652
1653 type Item = Option<f64>;
1654
1655 #[doc(hidden)]
1656 #[inline]
1657 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1658 let InterruptionTime { pid } = self;
1659 match pid.interrupt_flag.read_at(p) {
1660 Some(false) => Result::Ok(Some(pid.interrupt_time.read_at(p))),
1661 _ => Result::Ok(None)
1662 }
1663 }
1664}
1665
1666#[must_use = "computations are lazy and do nothing unless to be run"]
1668#[derive(Clone)]
1669pub struct Passivate {}
1670
1671impl Process for Passivate {
1672
1673 type Item = ();
1674
1675 #[doc(hidden)]
1676 #[inline]
1677 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1678 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1679 {
1680 let cont = ProcessBoxCont::new(cont);
1681 self.call_process_boxed(cont, pid, p)
1682 }
1683
1684 #[doc(hidden)]
1685 #[inline]
1686 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1687 match pid.react_flag.read_at(p) {
1688 Some(true) => {
1689 let msg = String::from("Cannot passivate the process twice");
1690 let err = OtherError::retry(msg);
1691 cut_error_process_boxed(cont, pid, err, p)
1692 },
1693 _ => {
1694 match pid.react_cont.swap_at(Some(cont), p) {
1695 Some(_) => panic!("The reactivation continuation has diverged"),
1696 None => {
1697 pid.react_priority.write_at(p.priority, p);
1698 pid.react_flag.write_at(Some(true), p);
1699 Result::Ok(())
1700 }
1701 }
1702 }
1703 }
1704 }
1705}
1706
1707#[must_use = "computations are lazy and do nothing unless to be run"]
1709#[derive(Clone)]
1710pub struct PassivateBefore<M> {
1711
1712 comp: M
1714}
1715
1716impl<M> Process for PassivateBefore<M>
1717 where M: Event<Item = ()>
1718{
1719 type Item = ();
1720
1721 #[doc(hidden)]
1722 #[inline]
1723 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1724 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1725 {
1726 let cont = ProcessBoxCont::new(cont);
1727 self.call_process_boxed(cont, pid, p)
1728 }
1729
1730 #[doc(hidden)]
1731 #[inline]
1732 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1733 match pid.react_flag.read_at(p) {
1734 Some(true) => {
1735 let msg = String::from("Cannot passivate the process twice");
1736 let err = OtherError::retry(msg);
1737 cut_error_process_boxed(cont, pid, err, p)
1738 },
1739 _ => {
1740 match pid.react_cont.swap_at(Some(cont), p) {
1741 Some(_) => panic!("The reactivation continuation has diverged"),
1742 None => {
1743 let PassivateBefore { comp } = self;
1744 pid.react_priority.write_at(p.priority, p);
1745 pid.react_flag.write_at(Some(true), p);
1746 comp.call_event(p)
1747 }
1748 }
1749 }
1750 }
1751 }
1752}
1753
1754#[must_use = "computations are lazy and do nothing unless to be run"]
1756#[derive(Clone)]
1757pub struct IsPassivated {
1758
1759 pid: Grc<ProcessId>
1761}
1762
1763impl Event for IsPassivated {
1764
1765 type Item = bool;
1766
1767 #[doc(hidden)]
1768 #[inline]
1769 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1770 let IsPassivated { pid } = self;
1771 match pid.react_flag.read_at(p) {
1772 Some(true) => Result::Ok(true),
1773 _ => Result::Ok(false)
1774 }
1775 }
1776}
1777
1778#[must_use = "computations are lazy and do nothing unless to be run"]
1780#[derive(Clone)]
1781pub struct Reactivate {
1782
1783 pid: Grc<ProcessId>
1785}
1786
1787impl Event for Reactivate {
1788
1789 type Item = ();
1790
1791 #[doc(hidden)]
1792 #[inline]
1793 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1794 let Reactivate { pid } = self;
1795 match pid.react_cont.swap_at(None, p) {
1796 Some(cont) => {
1797 match pid.react_flag.swap_at(None, p) {
1798 Some(true) => {
1799 let priority = pid.react_priority.read_at(p);
1800 enqueue_event_with_priority(p.time, priority, {
1801 cons_event(move |p| {
1802 resume_process_boxed(cont, pid, (), p)
1803 }).into_boxed()
1804 }).call_event(p)
1805 },
1806 _ => {
1807 panic!("The reactivation continuation has diverged")
1808 }
1809 }
1810 },
1811 None => {
1812 Result::Ok(())
1813 }
1814 }
1815 }
1816}
1817
1818#[must_use = "computations are lazy and do nothing unless to be run"]
1820#[derive(Clone)]
1821pub struct ReactivateImmediately {
1822
1823 pid: Grc<ProcessId>
1825}
1826
1827impl Event for ReactivateImmediately {
1828
1829 type Item = ();
1830
1831 #[doc(hidden)]
1832 #[inline]
1833 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1834 let ReactivateImmediately { pid } = self;
1835 match pid.react_cont.swap_at(None, p) {
1836 Some(cont) => {
1837 match pid.react_flag.swap_at(None, p) {
1838 Some(true) => {
1839 let priority = pid.react_priority.read_at(p);
1840 if p.priority == priority {
1841 resume_process_boxed(cont, pid, (), p)
1842 } else {
1843 enqueue_event_with_priority(p.time, priority, {
1844 cons_event(move |p| {
1845 resume_process_boxed(cont, pid, (), p)
1846 }).into_boxed()
1847 }).call_event(p)
1848 }
1849 },
1850 _ => {
1851 panic!("The reactivation continuation has diverged")
1852 }
1853 }
1854 },
1855 None => {
1856 Result::Ok(())
1857 }
1858 }
1859 }
1860}
1861
1862#[must_use = "computations are lazy and do nothing unless to be run"]
1864#[derive(Clone)]
1865pub struct ReactivateManyImmediately<I> {
1866
1867 pids: I
1869}
1870
1871impl<I> Event for ReactivateManyImmediately<I>
1872 where I: Iterator<Item = Grc<ProcessId>> + 'static
1873{
1874 type Item = ();
1875
1876 #[doc(hidden)]
1877 #[inline]
1878 fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1879 let ReactivateManyImmediately { mut pids } = self;
1880 match pids.next() {
1881 None => Result::Ok(()),
1882 Some(pid) => {
1883 match ProcessId::reactivate_immediately(pid).call_event(p) {
1884 Result::Err(e) => Result::Err(e),
1885 Result::Ok(()) => {
1886 let comp = ReactivateManyImmediately { pids: pids };
1887 yield_event(comp).call_event(p)
1888 }
1889 }
1890 }
1891 }
1892 }
1893}
1894
1895#[must_use = "computations are lazy and do nothing unless to be run"]
1897#[derive(Clone)]
1898pub struct Loop<F, M> {
1899
1900 f: F,
1902
1903 _phantom: PhantomData<M>
1905}
1906
1907impl<F, M> Process for Loop<F, M>
1908 where F: Fn() -> M + 'static,
1909 M: Process<Item = ()> + 'static
1910{
1911 type Item = M::Item;
1912
1913 #[doc(hidden)]
1914 #[inline(never)]
1915 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1916 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1917 {
1918 let Loop { f, _phantom } = self;
1919 let comp = f();
1920 comp.and_then(move |()| {
1921 Loop { f: f, _phantom: PhantomData }
1922 }).call_process(cont, pid, p)
1923 }
1924
1925 #[doc(hidden)]
1926 #[inline(never)]
1927 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1928 let Loop { f, _phantom } = self;
1929 let comp = f();
1930 comp.and_then(move |()| {
1931 Loop { f: f, _phantom: PhantomData }
1932 }).call_process_boxed(cont, pid, p)
1933 }
1934}
1935
1936#[must_use = "computations are lazy and do nothing unless to be run"]
1938#[derive(Clone)]
1939pub struct Sequence<I, M> where M: Process {
1940
1941 comps: I,
1943
1944 acc: Vec<M::Item>
1946}
1947
1948impl<I, M> Process for Sequence<I, M>
1949 where I: Iterator<Item = M> + 'static,
1950 M: Process + 'static
1951{
1952 type Item = Vec<M::Item>;
1953
1954 #[doc(hidden)]
1955 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1956 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1957 {
1958 let Sequence { mut comps, mut acc } = self;
1959 match comps.next() {
1960 None => cont(Result::Ok(acc), pid, p),
1961 Some(comp) => {
1962 comp.and_then(move |a| {
1963 acc.push(a);
1964 Sequence { comps: comps, acc: acc }
1965 }).call_process(cont, pid, p)
1966 }
1967 }
1968 }
1969
1970 #[doc(hidden)]
1971 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1972 let Sequence { mut comps, mut acc } = self;
1973 match comps.next() {
1974 None => cont.call_box((Result::Ok(acc), pid, p)),
1975 Some(comp) => {
1976 comp.and_then(move |a| {
1977 acc.push(a);
1978 Sequence { comps: comps, acc: acc }
1979 }).call_process_boxed(cont, pid, p)
1980 }
1981 }
1982 }
1983}
1984
1985#[must_use = "computations are lazy and do nothing unless to be run"]
1987#[derive(Clone)]
1988pub struct Sequence_<I, M>
1989{
1990 comps: I,
1992
1993 _phantom: PhantomData<M>
1995}
1996
1997impl<I, M> Process for Sequence_<I, M>
1998 where I: Iterator<Item = M> + 'static,
1999 M: Process + 'static
2000{
2001 type Item = ();
2002
2003 #[doc(hidden)]
2004 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2005 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2006 {
2007 let Sequence_ { mut comps, _phantom } = self;
2008 match comps.next() {
2009 None => cont(Result::Ok(()), pid, p),
2010 Some(comp) => {
2011 comp.and_then(move |_| {
2012 Sequence_ { comps: comps, _phantom: _phantom }
2013 }).call_process(cont, pid, p)
2014 }
2015 }
2016 }
2017
2018 #[doc(hidden)]
2019 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2020 let Sequence_ { mut comps, _phantom } = self;
2021 match comps.next() {
2022 None => cont.call_box((Result::Ok(()), pid, p)),
2023 Some(comp) => {
2024 comp.and_then(move |_| {
2025 Sequence_ { comps: comps, _phantom: _phantom }
2026 }).call_process_boxed(cont, pid, p)
2027 }
2028 }
2029 }
2030}
2031
2032#[must_use = "computations are lazy and do nothing unless to be run"]
2034#[derive(Clone)]
2035pub struct SpawnUsingId<M>
2036{
2037 cancellation: ProcessCancellation,
2039
2040 comp: M,
2042
2043 comp_id: Grc<ProcessId>
2045}
2046
2047impl<M> Process for SpawnUsingId<M>
2048 where M: Process<Item = ()> + 'static,
2049{
2050 type Item = ();
2051
2052 #[doc(hidden)]
2053 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2054 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2055 {
2056 if is_process_cancelled(&pid, p) {
2057 revoke_process(cont, pid, p)
2058 } else {
2059 let SpawnUsingId { cancellation, comp, comp_id } = self;
2060 match ProcessId::prepare(comp_id.clone(), p) {
2061 Result::Err(e) => Result::Err(e),
2062 Result::Ok(()) => {
2063 let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2064 match hs {
2065 Result::Err(e) => Result::Err(e),
2066 Result::Ok(hs) => {
2067 enqueue_event(p.time, {
2068 cons_event(move |p| {
2069 let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2070 hs.dispose(p)?;
2071 match a {
2072 Result::Ok(()) => Result::Ok(()),
2073 Result::Err(Error::Cancel) => Result::Ok(()),
2074 Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2075 }
2076 };
2077 comp.call_process(cont, comp_id, p)
2078 }).into_boxed()
2079 }).and_then(move |()| {
2080 cons_event(move |p| {
2081 resume_process(cont, pid, (), p)
2082 })
2083 }).call_event(p)
2084 }
2085 }
2086 }
2087 }
2088 }
2089 }
2090
2091 #[doc(hidden)]
2092 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2093 if is_process_cancelled(&pid, p) {
2094 revoke_process_boxed(cont, pid, p)
2095 } else {
2096 let SpawnUsingId { cancellation, comp, comp_id } = self;
2097 match ProcessId::prepare(comp_id.clone(), p) {
2098 Result::Err(e) => Result::Err(e),
2099 Result::Ok(()) => {
2100 let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2101 match hs {
2102 Result::Err(e) => Result::Err(e),
2103 Result::Ok(hs) => {
2104 enqueue_event(p.time, {
2105 cons_event(move |p| {
2106 let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2107 hs.dispose(p)?;
2108 match a {
2109 Result::Ok(()) => Result::Ok(()),
2110 Result::Err(Error::Cancel) => Result::Ok(()),
2111 Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2112 }
2113 };
2114 comp.call_process(cont, comp_id, p)
2115 }).into_boxed()
2116 }).and_then(move |()| {
2117 cons_event(move |p| {
2118 resume_process_boxed(cont, pid, (), p)
2119 })
2120 }).call_event(p)
2121 }
2122 }
2123 }
2124 }
2125 }
2126 }
2127}
2128
2129#[must_use = "computations are lazy and do nothing unless to be run"]
2131#[derive(Clone)]
2132pub struct Spawn<M>
2133{
2134 cancellation: ProcessCancellation,
2136
2137 comp: M
2139}
2140
2141impl<M> Process for Spawn<M>
2142 where M: Process<Item = ()> + 'static
2143{
2144 type Item = ();
2145
2146 #[doc(hidden)]
2147 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2148 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2149 {
2150 if is_process_cancelled(&pid, p) {
2151 revoke_process(cont, pid, p)
2152 } else {
2153 let Spawn { cancellation, comp } = self;
2154 ProcessId::new()
2155 .into_process()
2156 .and_then(move |pid| {
2157 spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2158 })
2159 .call_process(cont, pid, p)
2160 }
2161 }
2162
2163 #[doc(hidden)]
2164 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2165 if is_process_cancelled(&pid, p) {
2166 revoke_process_boxed(cont, pid, p)
2167 } else {
2168 let Spawn { cancellation, comp } = self;
2169 ProcessId::new()
2170 .into_process()
2171 .and_then(move |pid| {
2172 spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2173 })
2174 .call_process_boxed(cont, pid, p)
2175 }
2176 }
2177}
2178
2179#[must_use = "computations are lazy and do nothing unless to be run"]
2181#[derive(Clone)]
2182pub struct TimeoutUsingId<M>
2183{
2184 timeout: f64,
2186
2187 comp: M,
2189
2190 comp_id: Grc<ProcessId>
2192}
2193
2194impl<M> Process for TimeoutUsingId<M>
2195 where M: Process + 'static,
2196 M::Item: Clone
2197{
2198 type Item = Option<M::Item>;
2199
2200 #[doc(hidden)]
2201 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2202 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2203 {
2204 if is_process_cancelled(&pid, p) {
2205 revoke_process(cont, pid, p)
2206 } else {
2207 let TimeoutUsingId { timeout, comp, comp_id } = self;
2208 let comp_id_clone = comp_id.clone();
2209 let s = Grc::new(ObservableSource::new());
2210 let s_clone = s.clone();
2211 ProcessId::new()
2212 .into_process()
2213 .and_then(move |timeout_id| {
2214 let timeout_id = Grc::new(timeout_id);
2215 spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2216 hold_process(timeout)
2217 .and_then(move |()| {
2218 ProcessId::cancel(comp_id_clone)
2219 .into_process()
2220 })
2221 })
2222 .and_then(move |()| {
2223 let r = Grc::new(RefComp::new(None));
2224 spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2225 let r_clone = r.clone();
2226 comp.and_then(move |item| {
2227 RefComp::write(r_clone, Some(item))
2228 .into_process()
2229 })
2230 .finally({
2231 ProcessId::cancel(timeout_id)
2232 .and_then(move |()| {
2233 RefComp::read(r)
2234 .and_then(move |item| {
2235 s_clone.trigger(item)
2236 })
2237 })
2238 .into_process()
2239 })
2240 })
2241 })
2242 })
2243 .and_then(move |()| {
2244 process_await(s.publish())
2245 })
2246 .call_process(cont, pid, p)
2247 }
2248 }
2249
2250 #[doc(hidden)]
2251 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2252 if is_process_cancelled(&pid, p) {
2253 revoke_process_boxed(cont, pid, p)
2254 } else {
2255 let TimeoutUsingId { timeout, comp, comp_id } = self;
2256 let comp_id_clone = comp_id.clone();
2257 let s = Grc::new(ObservableSource::new());
2258 let s_clone = s.clone();
2259 ProcessId::new()
2260 .into_process()
2261 .and_then(move |timeout_id| {
2262 let timeout_id = Grc::new(timeout_id);
2263 spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2264 hold_process(timeout)
2265 .and_then(move |()| {
2266 ProcessId::cancel(comp_id_clone)
2267 .into_process()
2268 })
2269 })
2270 .and_then(move |()| {
2271 let r = Grc::new(RefComp::new(None));
2272 spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2273 let r_clone = r.clone();
2274 comp.and_then(move |item| {
2275 RefComp::write(r_clone, Some(item))
2276 .into_process()
2277 })
2278 .finally({
2279 ProcessId::cancel(timeout_id)
2280 .and_then(move |()| {
2281 RefComp::read(r)
2282 .and_then(move |item| {
2283 s_clone.trigger(item)
2284 })
2285 })
2286 .into_process()
2287 })
2288 })
2289 })
2290 })
2291 .and_then(move |()| {
2292 process_await(s.publish())
2293 })
2294 .call_process_boxed(cont, pid, p)
2295 }
2296 }
2297}
2298
2299#[must_use = "computations are lazy and do nothing unless to be run"]
2301#[derive(Clone)]
2302pub struct Timeout<M>
2303{
2304 timeout: f64,
2306
2307 comp: M
2309}
2310
2311impl<M> Process for Timeout<M>
2312 where M: Process + 'static,
2313 M::Item: Clone
2314{
2315 type Item = Option<M::Item>;
2316
2317 #[doc(hidden)]
2318 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2319 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2320 {
2321 if is_process_cancelled(&pid, p) {
2322 revoke_process(cont, pid, p)
2323 } else {
2324 let Timeout { timeout, comp } = self;
2325 ProcessId::new()
2326 .into_process()
2327 .and_then(move |comp_id| {
2328 timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2329 })
2330 .call_process(cont, pid, p)
2331 }
2332 }
2333
2334 #[doc(hidden)]
2335 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2336 if is_process_cancelled(&pid, p) {
2337 revoke_process_boxed(cont, pid, p)
2338 } else {
2339 let Timeout { timeout, comp } = self;
2340 ProcessId::new()
2341 .into_process()
2342 .and_then(move |comp_id| {
2343 timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2344 })
2345 .call_process_boxed(cont, pid, p)
2346 }
2347 }
2348}
2349
2350#[doc(hidden)]
2352pub struct FrozenProcess<T> {
2353
2354 comp: EventBox<Option<ProcessBoxCont<T>>>
2356}
2357
2358impl<T> FrozenProcess<T>
2359 where T: 'static
2360{
2361 #[doc(hidden)]
2363 #[inline]
2364 pub fn unfreeze(self, p: &Point) -> simulation::Result<Option<ProcessBoxCont<T>>> {
2365 let FrozenProcess { comp } = self;
2366 comp.call_box((p,))
2367 }
2368
2369 #[doc(hidden)]
2371 pub fn new(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<FrozenProcess<T>> {
2372 let cont = substitute_process_priority_boxed(p.priority, cont);
2373 let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2374 let rc = Grc::new(RefComp::new(Some(cont)));
2375 let h = {
2376 let rh = rh.clone();
2377 let rc = rc.clone();
2378 let weak_pid = Grc::downgrade(&pid);
2379 pid.cancel_initiating()
2380 .subscribe(cons_observer(move |_, p| {
2381 let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2382 let h = rh.swap_at(None, p).unwrap();
2383 h.dispose(p)?;
2384 match rc.swap_at(None, p) {
2385 None => Result::Ok(()),
2386 Some(cont) => {
2387 let pid = pid.clone();
2388 enqueue_event(p.time, {
2389 cons_event(move |p| {
2390 let z = is_process_cancelled(&pid, p);
2391 if z {
2392 revoke_process_boxed(cont, pid, p)
2393 } else {
2394 Result::Ok(())
2395 }
2396 }).into_boxed()
2397 }).call_event(p)
2398 }
2399 }
2400 }))
2401 .call_event(p)
2402 };
2403 match h {
2404 Result::Err(e) => Result::Err(e),
2405 Result::Ok(h) => {
2406 rh.write_at(Some(h), p);
2407 let comp = {
2408 cons_event(move |p| {
2409 let h = rh.swap_at(None, p).unwrap();
2410 h.dispose(p)?;
2411 let cont = rc.swap_at(None, p);
2412 Result::Ok(cont)
2413 }).into_boxed()
2414 };
2415 Result::Ok(FrozenProcess { comp: comp })
2416 }
2417 }
2418 }
2419
2420 #[doc(hidden)]
2422 pub fn with_reentering<M>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, comp: M, p: &Point) -> simulation::Result<FrozenProcess<T>>
2423 where M: Process<Item = T> + 'static
2424 {
2425 let cont = substitute_process_priority_boxed(p.priority, cont);
2426 let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2427 let rc = Grc::new(RefComp::new(Some(cont)));
2428 let h = {
2429 let rh = rh.clone();
2430 let rc = rc.clone();
2431 let weak_pid = Grc::downgrade(&pid);
2432 pid.cancel_initiating()
2433 .subscribe(cons_observer(move |_, p| {
2434 let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2435 let h = rh.swap_at(None, p).unwrap();
2436 h.dispose(p)?;
2437 match rc.swap_at(None, p) {
2438 None => Result::Ok(()),
2439 Some(cont) => {
2440 let pid = pid.clone();
2441 enqueue_event(p.time, {
2442 cons_event(move |p| {
2443 let z = is_process_cancelled(&pid, p);
2444 if z {
2445 revoke_process_boxed(cont, pid, p)
2446 } else {
2447 Result::Ok(())
2448 }
2449 }).into_boxed()
2450 }).call_event(p)
2451 }
2452 }
2453 }))
2454 .call_event(p)
2455 };
2456 match h {
2457 Result::Err(e) => Result::Err(e),
2458 Result::Ok(h) => {
2459 rh.write_at(Some(h), p);
2460 let comp = {
2461 cons_event(move |p| {
2462 let h = rh.swap_at(None, p).unwrap();
2463 h.dispose(p)?;
2464 match rc.swap_at(None, p) {
2465 None => Result::Ok(None),
2466 Some(cont) => {
2467 let f = pid.is_preempted(p);
2468 if !f {
2469 Result::Ok(Some(cont))
2470 } else {
2471 let cont = ProcessBoxCont::new(move |a, pid, p| {
2472 match a {
2473 Result::Ok(_) => comp.call_process_boxed(cont, pid, p),
2474 Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2475 }
2476 });
2477 match sleep_process(cont, pid, val, p) {
2478 Result::Ok(()) => Result::Ok(None),
2479 Result::Err(e) => Result::Err(e)
2480 }
2481 }
2482 }
2483 }
2484 }).into_boxed()
2485 };
2486 Result::Ok(FrozenProcess { comp: comp })
2487 }
2488 }
2489 }
2490}
2491
2492#[doc(hidden)]
2494pub fn reenter_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2495 where T: 'static
2496{
2497 let f = pid.is_preempted(p);
2498 if !f {
2499 enqueue_event(p.time, {
2500 cons_event(move |p| {
2501 let f = pid.is_preempted(p);
2502 if !f {
2503 resume_process_boxed(cont, pid, val, p)
2504 } else {
2505 sleep_process(cont, pid, val, p)
2506 }
2507 }).into_boxed()
2508 }).call_event(p)
2509 } else {
2510 sleep_process(cont, pid, val, p)
2511 }
2512}
2513
2514#[doc(hidden)]
2516pub fn sleep_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2517 where T: 'static
2518{
2519 let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2520 let rc = Grc::new(RefComp::new(Some(cont)));
2521 let rv = Grc::new(RefComp::new(Some(val)));
2522 let h = {
2523 let rh = rh.clone();
2524 pid.observable()
2525 .subscribe(cons_observer(move |e, p| {
2526 let h = rh.swap_at(None, p).unwrap();
2527 h.dispose(p)?;
2528 match e {
2529 &ProcessEvent::CancelInitiating => {
2530 let pid = pid.clone();
2531 let rc = rc.clone();
2532 enqueue_event(p.time, {
2533 cons_event(move |p| {
2534 let z = is_process_cancelled(&pid, p);
2535 if z {
2536 let cont = rc.swap_at(None, p).unwrap();
2537 revoke_process_boxed(cont, pid, p)
2538 } else {
2539 Result::Ok(())
2540 }
2541 }).into_boxed()
2542 }).call_event(p)
2543 },
2544 &ProcessEvent::PreemptionEnding => {
2545 let pid = pid.clone();
2546 let rc = rc.clone();
2547 let rv = rv.clone();
2548 enqueue_event(p.time, {
2549 cons_event(move |p| {
2550 let cont = rc.swap_at(None, p).unwrap();
2551 let val = rv.swap_at(None, p).unwrap();
2552 reenter_process(cont, pid, val, p)
2553 }).into_boxed()
2554 }).call_event(p)
2555 },
2556 &ProcessEvent::PreemptionInitiating => {
2557 panic!("The computation was already preempted")
2558 }
2559 }
2560 }))
2561 .call_event(p)
2562 };
2563 match h {
2564 Result::Err(e) => Result::Err(e),
2565 Result::Ok(h) => {
2566 rh.write_at(Some(h), p);
2567 Result::Ok(())
2568 }
2569 }
2570}
2571
2572#[doc(hidden)]
2574pub fn substitute_process<C, T, F>(cont: C, f: F) -> ProcessBoxCont<T>
2575 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
2576 T: 'static,
2577 F: FnOnce(C, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static,
2578{
2579 ProcessBoxCont::new(move |a, pid, p| {
2580 match a {
2581 Result::Ok(a) => f(cont, pid, a, p),
2582 Result::Err(e) => cont(Result::Err(e), pid, p)
2583 }
2584 })
2585}
2586
2587#[doc(hidden)]
2589pub fn substitute_process_boxed<T, F>(cont: ProcessBoxCont<T>, f: F) -> ProcessBoxCont<T>
2590 where T: 'static,
2591 F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static
2592{
2593 ProcessBoxCont::new(move |a, pid, p| {
2594 match a {
2595 Result::Ok(a) => f(cont, pid, a, p),
2596 Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2597 }
2598 })
2599}
2600
2601#[doc(hidden)]
2603pub fn substitute_process_priority<C, T>(priority: isize, cont: C) -> ProcessBoxCont<T>
2604 where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
2605 T: 'static
2606{
2607 ProcessBoxCont::new(move |a, pid, p| {
2608 match a {
2609 Result::Ok(a) => {
2610 if p.priority == priority {
2611 cont(Result::Ok(a), pid, p)
2612 } else {
2613 enqueue_event_with_priority(p.time, priority, {
2614 cons_event(move |p| {
2615 resume_process(cont, pid, a, p)
2616 }).into_boxed()
2617 }).call_event(p)
2618 }
2619 },
2620 Result::Err(e) => {
2621 cont(Result::Err(e), pid, p)
2622 }
2623 }
2624 })
2625}
2626
2627#[doc(hidden)]
2629pub fn substitute_process_priority_boxed<T>(priority: isize, cont: ProcessBoxCont<T>) -> ProcessBoxCont<T>
2630 where T: 'static
2631{
2632 ProcessBoxCont::new(move |a, pid, p| {
2633 match a {
2634 Result::Ok(a) => {
2635 if p.priority == priority {
2636 cont.call_box((Result::Ok(a), pid, p))
2637 } else {
2638 enqueue_event_with_priority(p.time, priority, {
2639 cons_event(move |p| {
2640 resume_process_boxed(cont, pid, a, p)
2641 }).into_boxed()
2642 }).call_event(p)
2643 }
2644 },
2645 Result::Err(e) => {
2646 cont.call_box((Result::Err(e), pid, p))
2647 }
2648 }
2649 })
2650}
2651
2652#[must_use = "computations are lazy and do nothing unless to be run"]
2654#[derive(Clone)]
2655pub struct Await<O, M> {
2656
2657 observable: O,
2659
2660 _phantom: PhantomData<M>
2662}
2663
2664impl<O, M> Process for Await<O, M>
2665 where O: Observable<Message = M>,
2666 M: Clone + 'static
2667{
2668 type Item = M;
2669
2670 #[doc(hidden)]
2671 #[inline]
2672 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2673 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2674 {
2675 self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2676 }
2677
2678 #[doc(hidden)]
2679 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2680 match FrozenProcess::new(cont, pid.clone(), p) {
2681 Result::Err(e) => Result::Err(e),
2682 Result::Ok(cont) => {
2683 let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2684 let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2685 let rc = RefComp::new(Some(cont));
2686 let h = {
2687 let rh = rh.clone();
2688 let rh2 = rh2.clone();
2689 let pid = pid.clone();
2690 let Await { observable, _phantom } = self;
2691 observable
2692 .subscribe(cons_observer(move |x: &M, p| {
2693 if let Some(h) = rh.swap_at(None, p) {
2694 h.dispose(p)?;
2695 }
2696 if let Some(h2) = rh2.swap_at(None, p) {
2697 h2.dispose(p)?;
2698 }
2699 let cont = rc.swap_at(None, p).unwrap();
2700 match cont.unfreeze(p) {
2701 Result::Err(e) => Result::Err(e),
2702 Result::Ok(None) => Result::Ok(()),
2703 Result::Ok(Some(cont)) => {
2704 let pid = pid.clone();
2705 reenter_process(cont, pid, x.clone(), p)
2706 }
2707 }
2708 }))
2709 .call_event(p)
2710 };
2711 match h {
2712 Result::Err(e) => return Result::Err(e),
2713 Result::Ok(h) => {
2714 let h2 = {
2715 let rh = rh.clone();
2716 let rh2 = rh2.clone();
2717 pid.cancel_initiating()
2718 .subscribe(cons_observer(move |_, p| {
2719 if let Some(h) = rh.swap_at(None, p) {
2720 h.dispose(p)?;
2721 }
2722 if let Some(h2) = rh2.swap_at(None, p) {
2723 h2.dispose(p)?;
2724 }
2725 Result::Ok(())
2726 }))
2727 .call_event(p)
2728 };
2729 match h2 {
2730 Result::Err(e) => {
2731 h.dispose(p)?;
2732 return Result::Err(e)
2733 },
2734 Result::Ok(h2) => {
2735 rh.write_at(Some(h), p);
2736 rh2.write_at(Some(h2), p);
2737 Result::Ok(())
2738 }
2739 }
2740 }
2741 }
2742 }
2743 }
2744 }
2745}
2746
2747#[must_use = "computations are lazy and do nothing unless to be run"]
2749#[derive(Clone)]
2750pub struct AwaitResult<O, M> {
2751
2752 observable: O,
2754
2755 _phantom: PhantomData<M>
2757}
2758
2759impl<O, M> Process for AwaitResult<O, M>
2760 where O: Observable<Message = simulation::Result<M>>,
2761 M: Clone + 'static
2762{
2763 type Item = M;
2764
2765 #[doc(hidden)]
2766 #[inline]
2767 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2768 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2769 {
2770 self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2771 }
2772
2773 #[doc(hidden)]
2774 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2775 match FrozenProcess::new(cont, pid.clone(), p) {
2776 Result::Err(e) => Result::Err(e),
2777 Result::Ok(cont) => {
2778 let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2779 let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2780 let rc = RefComp::new(Some(cont));
2781 let h = {
2782 let rh = rh.clone();
2783 let rh2 = rh2.clone();
2784 let pid = pid.clone();
2785 let AwaitResult { observable, _phantom } = self;
2786 observable
2787 .subscribe(cons_observer(move |x: &simulation::Result<M>, p| {
2788 if let Some(h) = rh.swap_at(None, p) {
2789 h.dispose(p)?;
2790 }
2791 if let Some(h2) = rh2.swap_at(None, p) {
2792 h2.dispose(p)?;
2793 }
2794 let cont = rc.swap_at(None, p).unwrap();
2795 match cont.unfreeze(p) {
2796 Result::Err(e) => Result::Err(e),
2797 Result::Ok(None) => Result::Ok(()),
2798 Result::Ok(Some(cont)) => {
2799 let pid = pid.clone();
2800 match x {
2801 Result::Ok(x) => {
2802 reenter_process(cont, pid, x.clone(), p)
2803 },
2804 Result::Err(Error::Cancel) => {
2805 revoke_process_boxed(cont, pid, p)
2806 },
2807 Result::Err(Error::Other(e)) => {
2808 match e.deref() {
2809 &OtherError::Retry(_) => {
2810 cut_error_process_boxed(cont, pid, e.clone(), p)
2811 },
2812 &OtherError::Panic(_) => {
2813 cut_error_process_boxed(cont, pid, e.clone(), p)
2814 },
2815 &OtherError::IO(_) => {
2816 propagate_error_process_boxed(cont, pid, e.clone(), p)
2817 }
2818 }
2819 }
2820 }
2821 }
2822 }
2823 }))
2824 .call_event(p)
2825 };
2826 match h {
2827 Result::Err(e) => return Result::Err(e),
2828 Result::Ok(h) => {
2829 let h2 = {
2830 let rh = rh.clone();
2831 let rh2 = rh2.clone();
2832 pid.cancel_initiating()
2833 .subscribe(cons_observer(move |_, p| {
2834 if let Some(h) = rh.swap_at(None, p) {
2835 h.dispose(p)?;
2836 }
2837 if let Some(h2) = rh2.swap_at(None, p) {
2838 h2.dispose(p)?;
2839 }
2840 Result::Ok(())
2841 }))
2842 .call_event(p)
2843 };
2844 match h2 {
2845 Result::Err(e) => {
2846 h.dispose(p)?;
2847 return Result::Err(e)
2848 },
2849 Result::Ok(h2) => {
2850 rh.write_at(Some(h), p);
2851 rh2.write_at(Some(h2), p);
2852 Result::Ok(())
2853 }
2854 }
2855 }
2856 }
2857 }
2858 }
2859 }
2860}
2861
2862#[must_use = "computations are lazy and do nothing unless to be run"]
2864#[derive(Clone)]
2865pub struct Transfer<T, M> {
2866
2867 comp: M,
2869
2870 _phantom: PhantomData<T>
2872}
2873
2874impl<T, M> Process for Transfer<T, M>
2875 where M: Process<Item = ()>
2876{
2877 type Item = T;
2878
2879 #[doc(hidden)]
2880 #[inline]
2881 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2882 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2883 {
2884 if is_process_cancelled(&pid, p) {
2885 revoke_process(cont, pid, p)
2886 } else {
2887 let Transfer { comp, _phantom } = self;
2888 fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2889 a
2890 }
2891 comp.call_process(cont, pid, p)
2892 }
2893 }
2894
2895 #[doc(hidden)]
2896 #[inline]
2897 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2898 if is_process_cancelled(&pid, p) {
2899 revoke_process_boxed(cont, pid, p)
2900 } else {
2901 let Transfer { comp, _phantom } = self;
2902 fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2903 a
2904 }
2905 comp.call_process(cont, pid, p)
2906 }
2907 }
2908}
2909
2910#[must_use = "computations are lazy and do nothing unless to be run"]
2912#[derive(Clone)]
2913pub struct Never<T> {
2914
2915 _phantom: PhantomData<T>
2917}
2918
2919impl<T> Process for Never<T> {
2920
2921 type Item = T;
2922
2923 #[doc(hidden)]
2924 #[inline]
2925 fn call_process<C>(self, _cont: C, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()>
2926 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2927 {
2928 Result::Ok(())
2929 }
2930
2931 #[doc(hidden)]
2932 #[inline]
2933 fn call_process_boxed(self, _cont: ProcessBoxCont<Self::Item>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2934 Result::Ok(())
2935 }
2936}
2937
2938#[must_use = "computations are lazy and do nothing unless to be run"]
2940#[derive(Clone)]
2941pub struct WhenCancelling<M> {
2942
2943 comp: M
2945}
2946
2947impl<M> Process for WhenCancelling<M>
2948 where M: Observer<Message = (), Item = ()> + 'static
2949{
2950 type Item = ();
2951
2952 #[doc(hidden)]
2953 #[inline]
2954 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2955 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2956 {
2957 if is_process_cancelled(&pid, p) {
2958 revoke_process(cont, pid, p)
2959 } else {
2960 let WhenCancelling { comp } = self;
2961 pid.cancel_initiating()
2962 .subscribe(cons_observer(move |_, p| {
2963 comp.call_observer(&(), p)
2964 }))
2965 .call_event(p)?;
2966 resume_process(cont, pid, (), p)
2967 }
2968 }
2969
2970 #[doc(hidden)]
2971 #[inline]
2972 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2973 if is_process_cancelled(&pid, p) {
2974 revoke_process_boxed(cont, pid, p)
2975 } else {
2976 let WhenCancelling { comp } = self;
2977 pid.cancel_initiating()
2978 .subscribe(cons_observer(move |_, p| {
2979 comp.call_observer(&(), p)
2980 }))
2981 .call_event(p)?;
2982 resume_process_boxed(cont, pid, (), p)
2983 }
2984 }
2985}
2986
2987#[must_use = "computations are lazy and do nothing unless to be run"]
2989#[derive(Clone)]
2990pub struct Panic<T> {
2991
2992 msg: String,
2994
2995 _phantom: PhantomData<T>
2997}
2998
2999impl<T> Process for Panic<T> {
3000
3001 type Item = T;
3002
3003 #[doc(hidden)]
3004 #[inline]
3005 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3006 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3007 {
3008 if is_process_cancelled(&pid, p) {
3009 revoke_process(cont, pid, p)
3010 } else {
3011 let Panic { msg, _phantom } = self;
3012 let err = Rc::new(OtherError::Panic(msg));
3013 cut_error_process(cont, pid, err, p)
3014 }
3015 }
3016
3017 #[doc(hidden)]
3018 #[inline]
3019 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3020 if is_process_cancelled(&pid, p) {
3021 revoke_process_boxed(cont, pid, p)
3022 } else {
3023 let Panic { msg, _phantom } = self;
3024 let err = Rc::new(OtherError::Panic(msg));
3025 cut_error_process_boxed(cont, pid, err, p)
3026 }
3027 }
3028}
3029
3030#[must_use = "computations are lazy and do nothing unless to be run"]
3032#[derive(Clone)]
3033pub struct Trace<M> {
3034
3035 comp: M,
3037
3038 msg: String
3040}
3041
3042impl<M> Process for Trace<M>
3043 where M: Process
3044{
3045 type Item = M::Item;
3046
3047 #[doc(hidden)]
3048 #[inline]
3049 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3050 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3051 {
3052 if is_process_cancelled(&pid, p) {
3053 revoke_process(cont, pid, p)
3054 } else {
3055 let Trace { comp, msg } = self;
3056 p.trace(&msg);
3057 comp.call_process(cont, pid, p)
3058 }
3059 }
3060
3061 #[doc(hidden)]
3062 #[inline]
3063 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3064 if is_process_cancelled(&pid, p) {
3065 revoke_process_boxed(cont, pid, p)
3066 } else {
3067 let Trace { comp, msg } = self;
3068 p.trace(&msg);
3069 comp.call_process_boxed(cont, pid, p)
3070 }
3071 }
3072}
3073#[must_use = "computations are lazy and do nothing unless to be run"]
3075#[derive(Clone)]
3076pub struct ProcessWithPriority {
3077
3078 priority: isize
3080}
3081
3082impl Process for ProcessWithPriority {
3083
3084 type Item = ();
3085
3086 #[doc(hidden)]
3087 #[inline]
3088 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3089 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3090 {
3091 if is_process_cancelled(&pid, p) {
3092 revoke_process(cont, pid, p)
3093 } else {
3094 let ProcessWithPriority { priority } = self;
3095 if priority == p.priority {
3096 cont(Result::Ok(()), pid, p)
3097 } else {
3098 let comp = cons_event(move |p| {
3099 if is_process_cancelled(&pid, p) {
3100 revoke_process(cont, pid, p)
3101 } else {
3102 cont(Result::Ok(()), pid, p)
3103 }
3104 });
3105 enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3106 .call_event(p)
3107 }
3108 }
3109 }
3110
3111 #[doc(hidden)]
3112 #[inline]
3113 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3114 if is_process_cancelled(&pid, p) {
3115 revoke_process_boxed(cont, pid, p)
3116 } else {
3117 let ProcessWithPriority { priority } = self;
3118 if priority == p.priority {
3119 cont.call_box((Result::Ok(()), pid, p))
3120 } else {
3121 let comp = cons_event(move |p| {
3122 if is_process_cancelled(&pid, p) {
3123 revoke_process_boxed(cont, pid, p)
3124 } else {
3125 cont.call_box((Result::Ok(()), pid, p))
3126 }
3127 });
3128 enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3129 .call_event(p)
3130 }
3131 }
3132 }
3133}
3134
3135#[must_use = "computations are lazy and do nothing unless to be run"]
3137#[derive(Clone)]
3138pub struct EmbedResult<T> {
3139
3140 val: simulation::Result<T>
3142}
3143
3144impl<T> Process for EmbedResult<T> {
3145
3146 type Item = T;
3147
3148 #[doc(hidden)]
3149 #[inline]
3150 fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3151 where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3152 {
3153 if is_process_cancelled(&pid, p) {
3154 revoke_process(cont, pid, p)
3155 } else {
3156 let EmbedResult { val } = self;
3157 match val {
3158 Result::Ok(a) => {
3159 cont(Result::Ok(a), pid, p)
3160 },
3161 Result::Err(Error::Cancel) => {
3162 let comp = cancel_process();
3163 comp.call_process(cont, pid, p)
3164 },
3165 Result::Err(Error::Other(e)) => {
3166 match e.deref() {
3167 &OtherError::Retry(_) => {
3168 cut_error_process(cont, pid, e, p)
3169 },
3170 &OtherError::Panic(_) => {
3171 cut_error_process(cont, pid, e, p)
3172 },
3173 &OtherError::IO(_) => {
3174 propagate_error_process(cont, pid, e, p)
3175 }
3176 }
3177 }
3178 }
3179 }
3180 }
3181
3182 #[doc(hidden)]
3183 #[inline]
3184 fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3185 if is_process_cancelled(&pid, p) {
3186 revoke_process_boxed(cont, pid, p)
3187 } else {
3188 let EmbedResult { val } = self;
3189 match val {
3190 Result::Ok(a) => {
3191 cont.call_box((Result::Ok(a), pid, p))
3192 },
3193 Result::Err(Error::Cancel) => {
3194 let comp = cancel_process();
3195 comp.call_process_boxed(cont, pid, p)
3196 },
3197 Result::Err(Error::Other(e)) => {
3198 match e.deref() {
3199 &OtherError::Retry(_) => {
3200 cut_error_process_boxed(cont, pid, e, p)
3201 },
3202 &OtherError::Panic(_) => {
3203 cut_error_process_boxed(cont, pid, e, p)
3204 },
3205 &OtherError::IO(_) => {
3206 propagate_error_process_boxed(cont, pid, e, p)
3207 }
3208 }
3209 }
3210 }
3211 }
3212 }
3213}