dvcompute_cons/simulation/process/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::rc::Rc;
8use std::marker::PhantomData;
9use std::ops::Deref;
10
11use crate::simulation;
12use crate::simulation::Point;
13use crate::simulation::error::*;
14use crate::simulation::event::*;
15use crate::simulation::{Run as SimulationRun};
16use crate::simulation::simulation::Simulation;
17use crate::simulation::observable::*;
18use crate::simulation::observable::observer::*;
19use crate::simulation::observable::source::*;
20use crate::simulation::observable::disposable::*;
21use crate::simulation::ref_comp::RefComp;
22
23use dvcompute_utils::grc::Grc;
24
25/// Random processes.
26pub mod random;
27
28/// Additional operations.
29pub mod ops;
30
31/// Return a new `Process` computation by the specified pure value.
32#[inline]
33pub fn return_process<T>(val: T) -> Return<T> {
34    Return { val: val }
35}
36
37/// Return a `Process` computation that panics with the specified error message.
38#[inline]
39pub fn panic_process<T>(msg: String) -> Panic<T> {
40    Panic { msg: msg, _phantom: PhantomData }
41}
42
43/// Delay the `Process` computation.
44#[inline]
45pub fn delay_process<F, M>(f: F) -> Delay<F, M>
46    where F: FnOnce() -> M + 'static,
47          M: Process + 'static
48{
49    Delay { f: f, _phantom: PhantomData }
50}
51
52/// Return the corresponding process identifier.
53#[inline]
54pub fn process_id() -> Id {
55    Id {}
56}
57
58/// Cancel the current process.
59#[inline]
60pub fn cancel_process<T>() -> Cancel<T> {
61    Cancel { _phantom: PhantomData }
62}
63
64/// Hold the process for the specified time interval.
65#[inline]
66pub fn hold_process(dt: f64) -> Hold {
67     Hold { dt: dt }
68}
69
70/// Passivate the process.
71#[inline]
72pub fn passivate_process() -> Passivate {
73    Passivate {}
74}
75
76/// Passivate the process before performing some action.
77#[inline]
78pub fn passivate_process_before<M>(comp: M) -> PassivateBefore<M>
79    where M: Event<Item = ()>
80{
81    PassivateBefore { comp: comp }
82}
83
84/// Execute the `Process` computation in loop.
85#[inline]
86pub fn loop_process<F, M>(f: F) -> Loop<F, M>
87    where F: Fn() -> M + 'static,
88          M: Process<Item = ()> + 'static
89{
90    Loop { f: f, _phantom: PhantomData }
91}
92
93/// Spawn the child process. In case of cancelling one of the processes,
94/// other process will be cancelled too.
95#[inline]
96pub fn spawn_process<M>(comp: M) -> Spawn<M>
97    where M: Process<Item = ()> + 'static
98{
99    spawn_process_with(ProcessCancellation::CancelTogether, comp)
100}
101
102/// Spawn the child process specifying how the child and parent processes
103/// should be cancelled in case of need.
104#[inline]
105pub fn spawn_process_using_id<M>(pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
106    where M: Process<Item = ()> + 'static
107{
108    spawn_process_using_id_with(ProcessCancellation::CancelTogether, pid, comp)
109}
110
111/// Spawn the child process specifying how the child and parent processes
112/// should be cancelled in case of need.
113#[inline]
114pub fn spawn_process_with<M>(cancellation: ProcessCancellation, comp: M) -> Spawn<M>
115    where M: Process<Item = ()> + 'static
116{
117    Spawn { cancellation: cancellation, comp: comp }
118}
119
120/// Spawn the child process specifying how the child and parent processes
121/// should be cancelled in case of need.
122#[inline]
123pub fn spawn_process_using_id_with<M>(cancellation: ProcessCancellation, pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
124    where M: Process<Item = ()> + 'static
125{
126    SpawnUsingId { cancellation: cancellation, comp: comp, comp_id: pid }
127}
128
129/// Await a signal that should be emitted by the specified observable.
130#[inline]
131pub fn process_await<O>(observable: O) -> Await<O, O::Message>
132    where O: Observable,
133          O::Message: Clone + 'static
134{
135    Await { observable: observable, _phantom: PhantomData }
136}
137
138/// Await a cancellable signal that should be emitted by the specified observable.
139#[inline]
140pub fn process_await_result<O, M>(observable: O) -> AwaitResult<O, M>
141    where O: Observable<Message = simulation::Result<M>>,
142          M: Clone + 'static
143{
144    AwaitResult { observable: observable, _phantom: PhantomData }
145}
146
147/// Try to run the child process within the specified timeout.
148#[inline]
149pub fn timeout_process<M>(timeout: f64, comp: M) -> Timeout<M>
150    where M: Process + 'static,
151          M::Item: Clone
152{
153    Timeout { timeout: timeout, comp: comp }
154}
155
156/// Try to run the child process within the specified timeout by using the given identifier.
157#[inline]
158pub fn timeout_process_using_id<M>(timeout: f64, pid: Grc<ProcessId>, comp: M) -> TimeoutUsingId<M>
159    where M: Process + 'static,
160          M::Item: Clone
161{
162    TimeoutUsingId { timeout: timeout, comp: comp, comp_id: pid }
163}
164
165/// Like the GoTo statement it transfers the direction of computation.
166#[inline]
167pub fn transfer_process<T, M>(comp: M) -> Transfer<T, M>
168    where M: Process<Item = ()>
169{
170    Transfer { comp: comp, _phantom: PhantomData }
171}
172
173/// A process that never returns a result.
174#[inline]
175pub fn never_process<T>() -> Never<T> {
176    Never { _phantom: PhantomData }
177}
178
179/// Register a handler that will be invoked in case of cancelling the current process.
180#[inline]
181pub fn when_cancelling_process<M>(comp: M) -> WhenCancelling<M>
182    where M: Observer<Message = (), Item = ()> + 'static
183{
184    WhenCancelling { comp: comp }
185}
186
187/// Embeds the result value within the current process, which may cancel the process, for example.
188#[inline]
189pub fn embed_result_process<T>(val: simulation::Result<T>) -> EmbedResult<T> {
190    EmbedResult { val: val }
191}
192
193/// Create a sequence of computations.
194#[inline]
195pub fn process_sequence<I, M>(comps: I) -> Sequence<I::IntoIter, M>
196    where I: IntoIterator<Item = M> + 'static,
197          M: Process + 'static
198{
199    let comps = comps.into_iter();
200    let acc = {
201        match comps.size_hint() {
202            (_, Some(n)) => Vec::with_capacity(n),
203            (_, None) => Vec::new()
204        }
205    };
206    Sequence { comps: comps, acc: acc }
207}
208
209/// Create a sequence of computations, where the result is ignored.
210#[inline]
211pub fn process_sequence_<I, M>(comps: I) -> Sequence_<I::IntoIter, M>
212    where I: IntoIterator<Item = M> + 'static,
213          M: Process + 'static
214{
215    Sequence_ { comps: comps.into_iter(), _phantom: PhantomData }
216}
217
218/// Proceed with the process that would use the specified time point priority.
219#[inline]
220pub fn process_with_priority(priority: isize) -> ProcessWithPriority {
221    ProcessWithPriority { priority: priority }
222}
223
224/// Wrap the computation so that it would restore its priority.
225#[inline]
226pub fn restore_process_priority<M>(priority: isize, comp: M) -> impl Process<Item = M::Item>
227    where M: Process + 'static
228{
229    process_with_priority(priority).and_then(|()| { comp })
230}
231
232/// Trace the computation.
233#[inline]
234pub fn trace_process<M>(msg: String, comp: M) -> Trace<M>
235    where M: Process
236{
237    Trace { comp: comp, msg: msg}
238}
239
240/// Check whether the `Process` computation should be cancelled.
241#[doc(hidden)]
242#[inline]
243pub fn is_process_cancelled(pid: &ProcessId, p: &Point) -> bool {
244    pid.is_cancel_activated(p)
245}
246
247/// Revoke the `Process` computation.
248#[doc(hidden)]
249pub fn revoke_process<T, C>(cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
250    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
251{
252    pid.deactivate_cancel(p);
253    cont(Result::Err(Error::Cancel), pid, p)
254}
255
256/// Revoke the `Process` computation using the boxed continuation.
257#[doc(hidden)]
258pub fn revoke_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
259    pid.deactivate_cancel(p);
260    cont.call_box((Result::Err(Error::Cancel), pid, p))
261}
262
263/// Cut the error `Process` computation.
264#[doc(hidden)]
265pub fn cut_error_process<T, C>(_cont: C, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()>
266    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
267{
268    Result::Err(Error::Other(err))
269}
270
271/// Cut the error `Process` computation by using the boxed continuation.
272#[doc(hidden)]
273pub fn cut_error_process_boxed<T>(_cont: ProcessBoxCont<T>, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()> {
274    Result::Err(Error::Other(err))
275}
276
277/// Propagate the error within `Process` computation.
278#[doc(hidden)]
279pub fn propagate_error_process<T, C>(cont: C, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()>
280    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
281{
282    cont(Result::Err(Error::Other(err)), pid, p)
283}
284
285/// Propagate the error within `Process` computation.
286#[doc(hidden)]
287pub fn propagate_error_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()> {
288    cont.call_box((Result::Err(Error::Other(err)), pid, p))
289}
290
291/// Resume the `Process` computation.
292#[doc(hidden)]
293#[inline]
294pub fn resume_process<T, C>(cont: C, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()>
295    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
296{
297    if is_process_cancelled(&pid, p) {
298        revoke_process(cont, pid, p)
299    } else {
300        cont(Result::Ok(t), pid, p)
301    }
302}
303
304/// Resume the `Process` computation using the boxed continuation.
305#[doc(hidden)]
306#[inline]
307pub fn resume_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()> {
308    if is_process_cancelled(&pid, p) {
309        revoke_process_boxed(cont, pid, p)
310    } else {
311        cont.call_box((Result::Ok(t), pid, p))
312    }
313}
314
315/// It defines how the parent and child computations should be cancelled.
316#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
317pub enum ProcessCancellation {
318
319    /// Cancel the both computations together.
320    CancelTogether,
321
322    /// Cancel the child if its parent is cancelled.
323    CancelChildAfterParent,
324
325    /// Cancel the parent if its child is cancelled.
326    CancelParentAfterChild,
327
328    /// Cancel the computations in isolation.
329    CancelInIsolation
330}
331
332/// The computation identifier.
333pub struct ProcessId {
334
335    /// Whether the cancellation has been initiated.
336    cancel_initiated: RefComp<bool>,
337
338    /// Whether the cancellation has been activated.
339    cancel_activated: RefComp<bool>,
340
341    /// The count of preemption requests.
342    preemption_count: RefComp<isize>,
343
344    /// The source of events.
345    source: ObservableSource<ProcessEvent>,
346
347    /// Whether the process is already started.
348    started: RefComp<bool>,
349
350    /// Whether the process was passivated.
351    react_flag: RefComp<Option<bool>>,
352
353    /// The reactivation continuation.
354    react_cont: RefComp<Option<ProcessBoxCont<()>>>,
355
356    /// The priority that must be applied in case of reactivation.
357    react_priority: RefComp<isize>,
358
359    /// Whether the process was interrupted prematurely.
360    interrupt_flag: RefComp<Option<bool>>,
361
362    /// The interruption continuation.
363    interrupt_cont: RefComp<Option<ProcessBoxCont<()>>>,
364
365    /// The time of interruption.
366    interrupt_time: RefComp<f64>,
367
368    /// The version of interruption.
369    interrupt_ver: RefComp<i64>,
370
371    /// The priority that must be applied in case of interruption.
372    interrupt_priority: RefComp<isize>
373}
374
375impl ProcessId {
376
377    /// Create a new computation identifier.
378    pub fn new() -> NewProcessId {
379        NewProcessId {}
380    }
381
382    /// Prepare the process identifier.
383    fn prepare(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
384        let y = pid.started.read_at(p);
385        if y {
386            panic!("Another process with the specified identifier has been started already")
387        } else {
388            pid.started.write_at(true, p)
389        }
390        let disposable = {
391            let observable = pid.observable();
392            let pid = Grc::downgrade(&pid);
393            observable
394                .subscribe(cons_observer(move |e, p| {
395                    match pid.upgrade() {
396                        None => {
397                            let msg = String::from("It should not happen");
398                            let err = Error::retry(msg);
399                            Result::Err(err)
400                        },
401                        Some(pid) => {
402                            match e {
403                                ProcessEvent::CancelInitiating => {
404                                    if pid.is_cancel_activated(p) {
405                                        match ProcessId::interrupt(pid.clone()).call_event(p) {
406                                            Result::Ok(()) => ProcessId::reactivate(pid).call_event(p),
407                                            Result::Err(e) => Result::Err(e)
408                                        }
409                                    } else {
410                                        Result::Ok(())
411                                    }
412                                },
413                                ProcessEvent::PreemptionInitiating => {
414                                    ProcessId::on_preempted_at(pid, p)
415                                },
416                                ProcessEvent::PreemptionEnding => {
417                                    Result::Ok(())
418                                }
419                            }
420                        }
421                    }
422                })).call_event(p)
423        };
424        match disposable {
425            Result::Ok(_) => Result::Ok(()),
426            Result::Err(e) => Result::Err(e)
427        }
428    }
429
430    /// Returns the observable of computation events.
431    #[inline]
432    pub fn observable(&self) -> Publish<ProcessEvent> {
433        self.source.publish()
434    }
435
436    /// Signal when the cancellation is initiated.
437    #[inline]
438    pub fn cancel_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
439        self.observable().filter(|m| { *m == ProcessEvent::CancelInitiating })
440    }
441
442    /// Whether the cancellation was initiated.
443    #[doc(hidden)]
444    #[inline]
445    pub fn is_cancel_initiated_at(&self, p: &Point) -> bool {
446        self.cancel_initiated.read_at(p)
447    }
448
449    /// Whether the cancellation was initiated.
450    #[inline]
451    pub fn is_cancel_initiated(pid: Grc<ProcessId>) -> impl Event<Item = bool> + Clone {
452        cons_event(move |p| Result::Ok(pid.is_cancel_initiated_at(p)))
453    }
454
455    /// Whether the cancellation was activated.
456    #[inline]
457    fn is_cancel_activated(&self, p: &Point) -> bool {
458        self.cancel_activated.read_at(p)
459    }
460
461    /// Deactivate the cancellation.
462    #[inline]
463    fn deactivate_cancel(&self, p: &Point) {
464        self.cancel_activated.write_at(false, p);
465    }
466
467    /// Cancel a process with the specified identifier.
468    #[inline]
469    pub fn cancel(pid: Grc<ProcessId>) -> CancelById {
470        CancelById { pid: pid }
471    }
472
473    /// If the main computation is cancelled then all nested ones will be cancelled too.
474    fn _bind_cancel(this: Grc<ProcessId>, others: Vec<Grc<ProcessId>>, p: &Point) -> simulation::Result<DisposableBox> {
475        let this2 = this.clone();
476        let this1 = this;
477        let others2 = {
478            others.iter().map(|pid| pid.clone()).collect::<Vec<_>>()
479        };
480        let others1 = others;
481        let hs1 = {
482            others1.iter().map(|pid| {
483                let this1 = Grc::downgrade(&this1);
484                pid.cancel_initiating()
485                    .subscribe(cons_observer(move |_, p| {
486                        match this1.upgrade() {
487                            None => Result::Ok(()),
488                            Some(this1) => this1.initiate_cancel_at(p)
489                        }
490                    }))
491            })
492        };
493        let hs1 = event_sequence(hs1).call_event(p);
494        match hs1 {
495            Result::Err(e) => Result::Err(e),
496            Result::Ok(hs1) => {
497                let hs2 = {
498                    others2.iter().map(|pid| {
499                        let pid = Grc::downgrade(&pid);
500                        this2.cancel_initiating()
501                            .subscribe(cons_observer(move |_, p| {
502                                match pid.upgrade() {
503                                    None => Result::Ok(()),
504                                    Some(pid) => pid.initiate_cancel_at(p)
505                                }
506                            }))
507                    })
508                };
509                let hs2 = event_sequence(hs2).call_event(p);
510                match hs2 {
511                    Result::Err(e) => Result::Err(e),
512                    Result::Ok(hs2) => {
513                        let hs1 = concat_disposables(hs1.into_iter());
514                        let hs2 = concat_disposables(hs2.into_iter());
515
516                        Result::Ok(hs1.merge(hs2).into_boxed())
517                    }
518                }
519            }
520        }
521    }
522
523    /// Connect the parent computation to the child one.
524    fn connect_cancel(parent: Grc<ProcessId>, cancellation: ProcessCancellation, child: Grc<ProcessId>, p: &Point) -> simulation::Result<DisposableBox> {
525        let h1 = match cancellation {
526            ProcessCancellation::CancelTogether |
527            ProcessCancellation::CancelChildAfterParent => {
528                let child = Grc::downgrade(&child);
529                parent.cancel_initiating()
530                    .subscribe(cons_observer(move |_, p| {
531                        match child.upgrade() {
532                            None => Result::Ok(()),
533                            Some(child) => child.initiate_cancel_at(p)
534                        }
535                    }))
536                    .call_event(p)
537            },
538            ProcessCancellation::CancelParentAfterChild |
539            ProcessCancellation::CancelInIsolation => {
540                Result::Ok(empty_disposable().into_boxed())
541            }
542        };
543        match h1 {
544            Result::Err(e) => Result::Err(e),
545            Result::Ok(h1) => {
546                let h2 = match cancellation {
547                    ProcessCancellation::CancelTogether |
548                    ProcessCancellation::CancelParentAfterChild => {
549                        let parent = Grc::downgrade(&parent);
550                        child.cancel_initiating()
551                            .subscribe(cons_observer(move |_, p| {
552                                match parent.upgrade() {
553                                    None => Result::Ok(()),
554                                    Some(parent) => parent.initiate_cancel_at(p)
555                                }
556                            }))
557                            .call_event(p)
558                    },
559                    ProcessCancellation::CancelChildAfterParent |
560                    ProcessCancellation::CancelInIsolation => {
561                        Result::Ok(empty_disposable().into_boxed())
562                    }
563                };
564                match h2 {
565                    Result::Err(e) => {
566                        h1.call_box((p,))?;
567                        Result::Err(e)
568                    },
569                    Result::Ok(h2) => Result::Ok(h1.merge(h2).into_boxed())
570                }
571            }
572        }
573    }
574
575    /// Initiate the cancellation.
576    #[doc(hidden)]
577    pub fn initiate_cancel_at(&self, p: &Point) -> simulation::Result<()> {
578        let f = self.cancel_initiated.read_at(p);
579        if !f {
580            self.cancel_initiated.write_at(true, p);
581            self.cancel_activated.write_at(true, p);
582            self.source.trigger_at(&ProcessEvent::CancelInitiating, p)
583        } else {
584            Result::Ok(())
585        }
586    }
587
588    /// Preempt the computation.
589    #[doc(hidden)]
590    pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
591        let f = self.cancel_initiated.read_at(p);
592        if !f {
593            let n = self.preemption_count.read_at(p);
594            self.preemption_count.write_at(n + 1, p);
595            if n == 0 {
596                self.source.trigger_at(&ProcessEvent::PreemptionInitiating, p)
597            } else {
598                Result::Ok(())
599            }
600        } else {
601            Result::Ok(())
602        }
603    }
604
605    /// Proceed with the computation after it was preempted earlier.
606    #[doc(hidden)]
607    pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
608        let f = self.cancel_initiated.read_at(p);
609        if !f {
610            let n = self.preemption_count.read_at(p);
611            self.preemption_count.write_at(n - 1, p);
612            if n - 1 == 0 {
613                self.source.trigger_at(&ProcessEvent::PreemptionEnding, p)
614            } else {
615                Result::Ok(())
616            }
617        } else {
618            Result::Ok(())
619        }
620    }
621
622    /// Preempt the computation.
623    #[inline]
624    pub fn begin_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
625        cons_event(move |p| pid.begin_preemption_at(p))
626    }
627
628    /// Proceed with the computation after it was preempted earlier.
629    #[inline]
630    pub fn end_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
631        cons_event(move |p| pid.end_preemption_at(p))
632    }
633
634    /// Signal when the computation is preempted.
635    #[inline]
636    pub fn preemption_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
637        self.observable().filter(|m| { *m == ProcessEvent::PreemptionInitiating })
638    }
639
640    /// Signal when the computation is proceeded after it was preempted.
641    #[inline]
642    pub fn preemption_ending(&self) -> impl Observable<Message = ProcessEvent> + Clone {
643        self.observable().filter(|m| { *m == ProcessEvent::PreemptionEnding })
644    }
645
646    /// Whether the computation was preempted.
647    #[inline]
648    fn is_preempted(&self, p: &Point) -> bool {
649        self.preemption_count.read_at(p) > 0
650    }
651
652    /// Interrupt a process with the specified identifier if the process
653    /// is held by computation `hold_process`. Returns an `Event` computation.
654    #[inline]
655    pub fn interrupt(pid: Grc<ProcessId>) -> Interrupt {
656        Interrupt { pid: pid }
657    }
658
659    /// Test whether the process with the specified identifier was interrupted.
660    /// Returns an `Event` computation.
661    #[inline]
662    pub fn is_interrupted(pid: Grc<ProcessId>) -> IsInterrupted {
663        IsInterrupted { pid: pid }
664    }
665
666    /// Return the expected interruption time after finishing the `hold_process` computation,
667    /// which value may change if the corresponding process is preempted.
668    /// Returns an `Event` computation.
669    #[inline]
670    pub fn interruption_time(pid: Grc<ProcessId>) -> InterruptionTime {
671        InterruptionTime { pid: pid }
672    }
673
674    /// Test whether the process with the specified identifier was passivated.
675    /// Returns an `Event` computation.
676    #[inline]
677    pub fn is_passivated(pid: Grc<ProcessId>) -> IsPassivated {
678        IsPassivated { pid: pid }
679    }
680
681    /// Reactivate a process with the specified identifier.
682    #[inline]
683    pub fn reactivate(pid: Grc<ProcessId>) -> Reactivate {
684        Reactivate { pid: pid }
685    }
686
687    /// Reactivate immediately a process with the specified identifier.
688    #[inline]
689    pub fn reactivate_immediately(pid: Grc<ProcessId>) -> ReactivateImmediately {
690        ReactivateImmediately { pid: pid }
691    }
692
693    /// Reactivate immediately the processes with specfified identifies.
694    #[inline]
695    pub fn reactivate_many_immediately<I>(pids: I) -> ReactivateManyImmediately<I::IntoIter>
696        where I: IntoIterator<Item = Grc<ProcessId>> + 'static
697    {
698        ReactivateManyImmediately { pids: pids.into_iter() }
699    }
700
701    /// Define a reaction when the process with the specified identifier is preempted.
702    fn on_preempted_at(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
703        match pid.interrupt_cont.swap_at(None, p) {
704            Some(cont) => {
705                let t  = pid.interrupt_time.read_at(p);
706                let dt = t - p.time;
707                let v  = pid.interrupt_ver.read_at(p);
708                let priority = pid.interrupt_priority.read_at(p);
709                pid.interrupt_flag.write_at(Some(true), p);
710                pid.interrupt_ver.write_at(v.wrapping_add(1), p);
711                let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
712                    restore_process_priority(priority, hold_process(dt))
713                        .call_process_boxed(cont, pid, p)
714                });
715                reenter_process(cont, pid, (), p)
716            },
717            None => {
718                match pid.react_cont.swap_at(None, p) {
719                    None => Result::Ok(()),
720                    Some(cont) => {
721                        let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
722                            reenter_process(cont, pid, (), p)
723                        });
724                        pid.react_cont.write_at(Some(cont), p);
725                        Result::Ok(())
726                    }
727                }
728            }
729        }
730    }
731}
732
733impl PartialEq for ProcessId {
734
735    fn eq(&self, other: &Self) -> bool {
736        self.started == other.started
737    }
738}
739
740impl Eq for ProcessId {}
741
742/// A computation that creates a `ProcessId` value.
743#[must_use = "computations are lazy and do nothing unless to be run"]
744#[derive(Clone)]
745pub struct NewProcessId {}
746
747impl Simulation for NewProcessId {
748
749    type Item = ProcessId;
750
751    #[doc(hidden)]
752    fn call_simulation(self, r: &SimulationRun) -> simulation::Result<Self::Item> {
753        let specs = &r.specs;
754        Result::Ok(ProcessId {
755            cancel_initiated: RefComp::new(false),
756            cancel_activated: RefComp::new(false),
757            preemption_count: RefComp::new(0),
758            source: ObservableSource::new(),
759            started: RefComp::new(false),
760            react_flag: RefComp::new(None),
761            react_cont: RefComp::new(None),
762            react_priority: RefComp::new(0),
763            interrupt_flag: RefComp::new(None),
764            interrupt_cont: RefComp::new(None),
765            interrupt_time: RefComp::new(specs.start_time),
766            interrupt_ver: RefComp::new(0),
767            interrupt_priority: RefComp::new(0)
768        })
769    }
770}
771
772/// The event that occurs whithin the `Process` computation.
773#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
774pub enum ProcessEvent {
775
776    /// Cancel the computation.
777    CancelInitiating,
778
779    /// Preempt the computation.
780    PreemptionInitiating,
781
782    /// Proceed with the computation after it was preempted.
783    PreemptionEnding
784}
785
786/// The computation based on continuations.
787pub trait Process {
788
789    /// The type of the item that is used within the computation.
790    type Item;
791
792    /// Call the `Process` computation.
793    #[doc(hidden)]
794    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
795        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static;
796
797    /// Call the `Process` computation using the boxed continuation.
798    #[doc(hidden)]
799    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>;
800
801    /// Bind the current computation with its continuation within the resulting computation.
802    #[inline]
803    fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
804        where Self: Sized + 'static,
805              U: Process + 'static,
806              F: FnOnce(Self::Item) -> U + 'static,
807    {
808        AndThen { comp: self, f: f, _phantom: PhantomData }
809    }
810
811    /// Map the current computation using the specified transform.
812    #[inline]
813    fn map<B, F>(self, f: F) -> Map<Self, B, F>
814        where Self: Sized + 'static,
815              F: FnOnce(Self::Item) -> B + 'static,
816    {
817        Map { comp: self, f: f, _phantom: PhantomData }
818    }
819
820    /// Zip the current computation with another one within the resulting computation.
821    #[inline]
822    fn zip<U>(self, other: U) -> Zip<Self, U>
823        where Self: Sized + 'static,
824              U: Process + 'static
825    {
826        Zip { comp: self, other: other }
827    }
828
829    /// The function application.
830    #[inline]
831    fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
832        where Self: Sized + 'static,
833              Self::Item: FnOnce(U::Item) -> B,
834              U: Process + 'static,
835              B: 'static
836    {
837        Ap { comp: self, other: other, _phantom: PhantomData }
838    }
839
840    /// Finalize the current computation regardless of canceling it or not.
841    #[inline]
842    fn finally<U>(self, finalization: U) -> Finally<Self, U>
843        where Self: Sized + 'static,
844              U: Process<Item = ()> + 'static
845    {
846        Finally { comp: self, finalization: finalization }
847    }
848
849    /// Run the `Process` computation.
850    #[inline]
851    fn run(self) -> Run<Self>
852        where Self: Process<Item = ()> + Sized
853    {
854        Run { comp: self, pid: None }
855    }
856
857    /// Run the `Process` computation using the specified process identifier.
858    #[inline]
859    fn run_using_id(self, pid: Grc<ProcessId>) -> Run<Self>
860        where Self: Process<Item = ()> + Sized
861    {
862        Run { comp: self, pid: Some(pid) }
863    }
864
865    /// Convert into a boxed value.
866    #[inline]
867    fn into_boxed(self) -> ProcessBox<Self::Item>
868        where Self: Sized + 'static
869    {
870        ProcessBox::new(move |cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point| {
871            self.call_process_boxed(cont, pid, p)
872        })
873    }
874}
875
876/// Allows converting to `Process` computations.
877pub trait IntoProcess {
878
879    /// The target computation.
880    type Process: Process<Item = Self::Item>;
881
882    /// The type of item that is returned by the computation.
883    type Item;
884
885    /// Convert to the `Process` computation.
886    fn into_process(self) -> Self::Process;
887}
888
889impl<M: Process> IntoProcess for M {
890
891    type Process = M;
892
893    type Item = M::Item;
894
895    #[inline]
896    fn into_process(self) -> Self::Process {
897        self
898    }
899}
900
901/// The continuation of the boxed `Process` computation.
902#[doc(hidden)]
903pub struct ProcessBoxCont<T> {
904    f: Box<dyn ProcessContFnBox<T>>
905}
906
907#[doc(hidden)]
908impl<T> ProcessBoxCont<T> {
909
910    #[inline]
911    pub fn new<F>(f: F) -> Self
912        where F: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
913    {
914        ProcessBoxCont { f: Box::new(f) }
915    }
916
917    #[inline]
918    pub fn call_box(self, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
919        self.f.call_box(args)
920    }
921}
922
923/// A trait to support the stable version of Rust, where there is no `FnBox`.
924trait ProcessContFnBox<T> {
925
926    /// Call the corresponding function.
927    fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
928}
929
930impl<T, F> ProcessContFnBox<T> for F
931    where F: for<'a> FnOnce(simulation::Result<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
932{
933    fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
934        let this: Self = *self;
935        this(args.0, args.1, args.2)
936    }
937}
938
939/// The `Process` computation box.
940#[must_use = "computations are lazy and do nothing unless to be run"]
941pub struct ProcessBox<T> {
942    f: Box<dyn ProcessFnBox<T>>
943}
944
945#[doc(hidden)]
946impl<T> ProcessBox<T> {
947
948    #[inline]
949    pub fn new<F>(f: F) -> Self
950        where F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
951    {
952        ProcessBox { f: Box::new(f) }
953    }
954
955    #[inline]
956    pub fn call_box(self, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
957        self.f.call_box(args)
958    }
959}
960
961impl<T> Process for ProcessBox<T> {
962
963    type Item = T;
964
965    #[doc(hidden)]
966    #[inline]
967    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
968        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
969    {
970        let cont = ProcessBoxCont::new(cont);
971        self.call_box((cont, pid, p))
972    }
973
974    #[doc(hidden)]
975    #[inline]
976    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
977        self.call_box((cont, pid, p))
978    }
979
980    #[inline]
981    fn into_boxed(self) -> ProcessBox<Self::Item>
982        where Self: Sized + 'static
983    {
984        self
985    }
986}
987
988/// A trait to support the stable version of Rust, where there is no `FnBox`.
989trait ProcessFnBox<T> {
990
991    /// Call the corresponding function.
992    fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
993}
994
995impl<T, F> ProcessFnBox<T> for F
996    where F: for<'a> FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
997{
998    fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
999        let this: Self = *self;
1000        this(args.0, args.1, args.2)
1001    }
1002}
1003
1004/// Allows creating the `Process` computation from a pure value.
1005#[must_use = "computations are lazy and do nothing unless to be run"]
1006#[derive(Clone)]
1007pub struct Return<T> {
1008
1009    /// Return a pure value, which is then transformed to the computation.
1010    val: T
1011}
1012
1013impl<T> Process for Return<T> {
1014
1015    type Item = T;
1016
1017    #[doc(hidden)]
1018    #[inline]
1019    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1020        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1021    {
1022        if is_process_cancelled(&pid, p) {
1023            revoke_process(cont, pid, p)
1024        } else {
1025            let Return { val } = self;
1026            cont(Result::Ok(val), pid, p)
1027        }
1028    }
1029
1030    #[doc(hidden)]
1031    #[inline]
1032    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1033        if is_process_cancelled(&pid, p) {
1034            revoke_process_boxed(cont, pid, p)
1035        } else {
1036            let Return { val } = self;
1037            cont.call_box((Result::Ok(val), pid, p))
1038        }
1039    }
1040}
1041
1042/// Allows delaying the `Process` computation by the specified function.
1043#[must_use = "computations are lazy and do nothing unless to be run"]
1044#[derive(Clone)]
1045pub struct Delay<F, M> {
1046
1047    /// Return the computation.
1048    f: F,
1049
1050    /// To keep the type parameter.
1051    _phantom: PhantomData<M>
1052}
1053
1054impl<F, M> Process for Delay<F, M>
1055    where F: FnOnce() -> M + 'static,
1056          M: Process + 'static
1057{
1058    type Item = M::Item;
1059
1060    #[doc(hidden)]
1061    #[inline]
1062    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1063        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1064    {
1065        let Delay { f, _phantom } = self;
1066        return_process(()).and_then(move |_| {
1067            f()
1068        }).call_process(cont, pid, p)
1069    }
1070
1071    #[doc(hidden)]
1072    #[inline]
1073    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1074        let Delay { f, _phantom } = self;
1075        return_process(()).and_then(move |_| {
1076            f()
1077        }).call_process_boxed(cont, pid, p)
1078    }
1079}
1080
1081/// The monadic bind for the `Process` computation.
1082#[must_use = "computations are lazy and do nothing unless to be run"]
1083#[derive(Clone)]
1084pub struct AndThen<M, U, F> {
1085
1086    /// The current computation.
1087    comp: M,
1088
1089    /// The continuation of the current computation.
1090    f: F,
1091
1092    /// To keep the type parameter.
1093    _phantom: PhantomData<U>
1094}
1095
1096impl<M, U, F> Process for AndThen<M, U, F>
1097    where F: FnOnce(M::Item) -> U + 'static,
1098          M: Process + 'static,
1099          U: Process + 'static
1100{
1101    type Item = U::Item;
1102
1103    #[doc(hidden)]
1104    #[inline]
1105    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1106        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1107    {
1108        // m >>= k = \c -> m (\a -> k a c)
1109
1110        if is_process_cancelled(&pid, p) {
1111            revoke_process(cont, pid, p)
1112        } else {
1113            let AndThen { comp, f, _phantom } = self;
1114            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1115                match a {
1116                    Result::Ok(a) => {
1117                        f(a).call_process(cont, pid, p)
1118                    },
1119                    Result::Err(Error::Cancel) => {
1120                        revoke_process(cont, pid, p)
1121                    },
1122                    Result::Err(Error::Other(e)) => {
1123                        match e.deref() {
1124                            &OtherError::Retry(_) => {
1125                                cut_error_process(cont, pid, e, p)
1126                            },
1127                            &OtherError::Panic(_) => {
1128                                cut_error_process(cont, pid, e, p)
1129                            },
1130                            &OtherError::IO(_) => {
1131                                propagate_error_process(cont, pid, e, p)
1132                            }
1133                        }
1134                    }
1135                }
1136            };
1137            comp.call_process(cont, pid, p)
1138        }
1139    }
1140
1141    #[doc(hidden)]
1142    #[inline]
1143    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1144        // m >>= k = \c -> m (\a -> k a c)
1145
1146        if is_process_cancelled(&pid, p) {
1147            revoke_process_boxed(cont, pid, p)
1148        } else {
1149            let AndThen { comp, f, _phantom } = self;
1150            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1151                match a {
1152                    Result::Ok(a) => {
1153                        f(a).call_process_boxed(cont, pid, p)
1154                    },
1155                    Result::Err(Error::Cancel) => {
1156                        revoke_process_boxed(cont, pid, p)
1157                    },
1158                    Result::Err(Error::Other(e)) => {
1159                        match e.deref() {
1160                            &OtherError::Retry(_) => {
1161                                cut_error_process_boxed(cont, pid, e, p)
1162                            },
1163                            &OtherError::Panic(_) => {
1164                                cut_error_process_boxed(cont, pid, e, p)
1165                            },
1166                            &OtherError::IO(_) => {
1167                                propagate_error_process_boxed(cont, pid, e, p)
1168                            }
1169                        }
1170                    }
1171                }
1172            };
1173            comp.call_process(cont, pid, p)
1174        }
1175    }
1176}
1177
1178/// The finally block for the `Process` computation.
1179#[must_use = "computations are lazy and do nothing unless to be run"]
1180#[derive(Clone)]
1181pub struct Finally<M, U> {
1182
1183    /// The current computation.
1184    comp: M,
1185
1186    /// The finalization computation.
1187    finalization: U
1188}
1189
1190impl<M, U> Process for Finally<M, U>
1191    where M: Process + 'static,
1192          U: Process<Item = ()> + 'static
1193{
1194    type Item = M::Item;
1195
1196    #[doc(hidden)]
1197    #[inline]
1198    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1199        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1200    {
1201        if is_process_cancelled(&pid, p) {
1202            revoke_process(cont, pid, p)
1203        } else {
1204            let Finally { comp, finalization } = self;
1205            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1206                let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1207                    match a0 {
1208                        Result::Ok(())  => cont(a, pid, p),
1209                        Result::Err(e0) => {
1210                            match a {
1211                                Result::Ok(_)  => cont(Result::Err(e0), pid, p),
1212                                Result::Err(e) => cont(Result::Err(e.merge(&e0)), pid, p)
1213                            }
1214                        }
1215                    }
1216                };
1217                finalization.call_process(cont, pid, p)
1218            };
1219            comp.call_process(cont, pid, p)
1220        }
1221    }
1222
1223    #[doc(hidden)]
1224    #[inline]
1225    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1226        if is_process_cancelled(&pid, p) {
1227            revoke_process_boxed(cont, pid, p)
1228        } else {
1229            let Finally { comp, finalization } = self;
1230            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1231                let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1232                    match a0 {
1233                        Result::Ok(()) => cont.call_box((a, pid, p)),
1234                        Result::Err(e0) => {
1235                            match a {
1236                                Result::Ok(_)  => cont.call_box((Result::Err(e0), pid, p)),
1237                                Result::Err(e) => cont.call_box((Result::Err(e.merge(&e0)), pid, p))
1238                            }
1239                        }
1240                    }
1241                };
1242                finalization.call_process(cont, pid, p)
1243            };
1244            comp.call_process(cont, pid, p)
1245        }
1246    }
1247}
1248
1249/// The functor for the `Process` computation.
1250#[must_use = "computations are lazy and do nothing unless to be run"]
1251#[derive(Clone)]
1252pub struct Map<M, B, F> {
1253
1254    /// The current computation.
1255    comp: M,
1256
1257    /// The transform.
1258    f: F,
1259
1260    /// To keep the type parameter.
1261    _phantom: PhantomData<B>
1262}
1263
1264impl<M, B, F> Process for Map<M, B, F>
1265    where F: FnOnce(M::Item) -> B + 'static,
1266          M: Process + 'static,
1267          B: 'static
1268{
1269    type Item = B;
1270
1271    #[doc(hidden)]
1272    #[inline]
1273    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1274        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1275    {
1276        let Map { comp, f, _phantom } = self;
1277        comp.and_then(move |a| {
1278            return_process(f(a))
1279        }).call_process(cont, pid, p)
1280    }
1281
1282    #[doc(hidden)]
1283    #[inline]
1284    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1285        let Map { comp, f, _phantom } = self;
1286        comp.and_then(move |a| {
1287            return_process(f(a))
1288        }).call_process_boxed(cont, pid, p)
1289    }
1290}
1291
1292/// The zip of two `Process` computations.
1293#[must_use = "computations are lazy and do nothing unless to be run"]
1294#[derive(Clone)]
1295pub struct Zip<M, U> {
1296
1297    /// The current computation.
1298    comp: M,
1299
1300    /// Another computation.
1301    other: U,
1302}
1303
1304impl<M, U> Process for Zip<M, U>
1305    where M: Process + 'static,
1306          U: Process + 'static
1307{
1308    type Item = (M::Item, U::Item);
1309
1310    #[doc(hidden)]
1311    #[inline]
1312    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1313        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1314    {
1315        let Zip { comp, other } = self;
1316        comp.and_then(move |a| {
1317            other.and_then(move |b| {
1318                return_process((a, b))
1319            })
1320        }).call_process(cont, pid, p)
1321    }
1322
1323    #[doc(hidden)]
1324    #[inline]
1325    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1326        let Zip { comp, other } = self;
1327        comp.and_then(move |a| {
1328            other.and_then(move |b| {
1329                return_process((a, b))
1330            })
1331        }).call_process_boxed(cont, pid, p)
1332    }
1333}
1334
1335/// The function application for the `Process` computation.
1336#[must_use = "computations are lazy and do nothing unless to be run"]
1337#[derive(Clone)]
1338pub struct Ap<M, U, B> {
1339
1340    /// The current computation.
1341    comp: M,
1342
1343    /// The continuation of the current computation.
1344    other: U,
1345
1346    /// To keep the type parameter.
1347    _phantom: PhantomData<B>
1348}
1349
1350impl<M, U, B> Process for Ap<M, U, B>
1351    where M: Process + 'static,
1352          U: Process + 'static,
1353          M::Item: FnOnce(U::Item) -> B,
1354          B: 'static
1355{
1356    type Item = B;
1357
1358    #[doc(hidden)]
1359    #[inline]
1360    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1361        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1362    {
1363        let Ap { comp, other, _phantom } = self;
1364        comp.and_then(move |f| {
1365            other.and_then(move |a| {
1366                return_process(f(a))
1367            })
1368        }).call_process(cont, pid, p)
1369    }
1370
1371    #[doc(hidden)]
1372    #[inline]
1373    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1374        let Ap { comp, other, _phantom } = self;
1375        comp.and_then(move |f| {
1376            other.and_then(move |a| {
1377                return_process(f(a))
1378            })
1379        }).call_process_boxed(cont, pid, p)
1380    }
1381}
1382
1383/// Run the computation.
1384#[must_use = "computations are lazy and do nothing unless to be run"]
1385pub struct Run<M> {
1386
1387    /// The current computation.
1388    comp: M,
1389
1390    /// The process identifier.
1391    pid: Option<Grc<ProcessId>>
1392}
1393
1394impl<M> Event for Run<M>
1395    where M: Process<Item = ()>
1396{
1397    type Item = M::Item;
1398
1399    #[doc(hidden)]
1400    #[inline]
1401    fn call_event(self, p: &Point) -> simulation::Result<M::Item> {
1402        let Run { comp, pid } = self;
1403        let pid = {
1404            match pid {
1405                Some(pid) => pid,
1406                None => {
1407                    match ProcessId::new().into_event().call_event(p) {
1408                        Result::Ok(pid) => Grc::new(pid),
1409                        Result::Err(e) => return Result::Err(e)
1410                    }
1411                }
1412            }
1413        };
1414        let cont = &initial_cont;
1415        match ProcessId::prepare(pid.clone(), p) {
1416            Result::Err(e) => Result::Err(e),
1417            Result::Ok(()) => comp.call_process(cont, pid, p)
1418        }
1419    }
1420}
1421
1422/// The initial continuation.
1423fn initial_cont(_a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
1424    Result::Ok(())
1425}
1426
1427/// Return the corresponding process identifier.
1428#[must_use = "computations are lazy and do nothing unless to be run"]
1429#[derive(Clone)]
1430pub struct Id {}
1431
1432impl Process for Id {
1433
1434    type Item = Grc<ProcessId>;
1435
1436    #[doc(hidden)]
1437    #[inline]
1438    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1439        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1440    {
1441        cont(Result::Ok(pid.clone()), pid, p)
1442    }
1443
1444    #[doc(hidden)]
1445    #[inline]
1446    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1447        cont.call_box((Result::Ok(pid.clone()), pid, p))
1448    }
1449}
1450
1451/// Cancel a process by the specified process identifier.
1452#[must_use = "computations are lazy and do nothing unless to be run"]
1453#[derive(Clone)]
1454pub struct CancelById {
1455
1456    /// The process identifier.
1457    pid: Grc<ProcessId>
1458}
1459
1460impl Event for CancelById {
1461
1462    type Item = ();
1463
1464    #[doc(hidden)]
1465    #[inline]
1466    fn call_event(self, p: &Point) -> simulation::Result<()> {
1467        let CancelById { pid } = self;
1468        pid.initiate_cancel_at(p)
1469    }
1470}
1471
1472/// Cancel the current process.
1473#[must_use = "computations are lazy and do nothing unless to be run"]
1474#[derive(Clone)]
1475pub struct Cancel<T> {
1476
1477    /// To keep the type parameter.
1478    _phantom: PhantomData<T>
1479}
1480
1481impl<T> Process for Cancel<T> {
1482
1483    type Item = T;
1484
1485    #[doc(hidden)]
1486    #[inline]
1487    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1488        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1489    {
1490        match pid.initiate_cancel_at(p) {
1491            Result::Ok(()) => {
1492                if is_process_cancelled(&pid, p) {
1493                    revoke_process(cont, pid, p)
1494                } else {
1495                    Result::Ok(())
1496                }
1497            },
1498            Result::Err(e) => Result::Err(e)
1499        }
1500    }
1501
1502    #[doc(hidden)]
1503    #[inline]
1504    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1505        match pid.initiate_cancel_at(p) {
1506            Result::Ok(()) => {
1507                if is_process_cancelled(&pid, p) {
1508                    revoke_process_boxed(cont, pid, p)
1509                } else {
1510                    Result::Ok(())
1511                }
1512            },
1513            Result::Err(e) => Result::Err(e)
1514        }
1515    }
1516}
1517
1518/// The hold block for the `Process` computation.
1519#[must_use = "computations are lazy and do nothing unless to be run"]
1520#[derive(Clone)]
1521pub struct Hold {
1522
1523    /// The time interval of holding the process.
1524    dt: f64
1525}
1526
1527impl Process for Hold {
1528
1529    type Item = ();
1530
1531    #[doc(hidden)]
1532    #[inline]
1533    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1534        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1535    {
1536        let cont = ProcessBoxCont::new(cont);
1537        self.call_process_boxed(cont, pid, p)
1538    }
1539
1540    #[doc(hidden)]
1541    #[inline]
1542    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1543        let Hold { dt } = self;
1544        let t = p.time + dt;
1545        pid.interrupt_cont.write_at(Some(cont), p);
1546        pid.interrupt_flag.write_at(Some(false), p);
1547        pid.interrupt_time.write_at(t, p);
1548        pid.interrupt_priority.write_at(p.priority, p);
1549        let v = pid.interrupt_ver.read_at(p);
1550        enqueue_event(t, {
1551            cons_event(move |p| {
1552                if p.priority >= p.minimal_priority {
1553                    let v2 = pid.interrupt_ver.read_at(p);
1554                    if v == v2 {
1555                        pid.interrupt_flag.write_at(None, p);
1556                        let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1557                        resume_process_boxed(cont, pid, (), p)
1558                    } else {
1559                        Result::Ok(())
1560                    }
1561
1562                } else {
1563                    pid.initiate_cancel_at(p)?;
1564                    if is_process_cancelled(&pid, p) {
1565                        let v2 = pid.interrupt_ver.read_at(p);
1566                        if v == v2 {
1567                            pid.interrupt_flag.write_at(None, p);
1568                            let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1569                            revoke_process_boxed(cont, pid, p)
1570                        } else {
1571                            Result::Ok(())
1572                        }
1573                    } else {
1574                        Result::Ok(())
1575                    }
1576                }
1577            }).into_boxed()
1578        }).call_event(p)
1579    }
1580}
1581
1582/// Interrupt a process with the specified identifier if the process
1583/// is held by computation `hold_process`.
1584#[must_use = "computations are lazy and do nothing unless to be run"]
1585#[derive(Clone)]
1586pub struct Interrupt {
1587
1588    /// The process identifier.
1589    pid: Grc<ProcessId>
1590}
1591
1592impl Event for Interrupt {
1593
1594    type Item = ();
1595
1596    #[doc(hidden)]
1597    #[inline]
1598    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1599        let Interrupt { pid } = self;
1600        match pid.interrupt_cont.swap_at(None, p) {
1601            None => Result::Ok(()),
1602            Some(cont) => {
1603                let v = pid.interrupt_ver.read_at(p);
1604                let priority = pid.interrupt_priority.read_at(p);
1605                pid.interrupt_ver.write_at(v.wrapping_add(1), p);
1606                pid.interrupt_flag.write_at(Some(true), p);
1607                enqueue_event_with_priority(p.time, priority, {
1608                    cons_event(move |p| {
1609                        resume_process_boxed(cont, pid, (), p)
1610                    }).into_boxed()
1611                }).call_event(p)
1612            }
1613        }
1614    }
1615}
1616
1617/// Test whether the process with the specified identifier was interrupted.
1618#[must_use = "computations are lazy and do nothing unless to be run"]
1619#[derive(Clone)]
1620pub struct IsInterrupted {
1621
1622    /// The process identifier.
1623    pid: Grc<ProcessId>
1624}
1625
1626impl Event for IsInterrupted {
1627
1628    type Item = bool;
1629
1630    #[doc(hidden)]
1631    #[inline]
1632    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1633        let IsInterrupted { pid } = self;
1634        match pid.interrupt_flag.read_at(p) {
1635            Some(true) => Result::Ok(true),
1636            _ => Result::Ok(false)
1637        }
1638    }
1639}
1640
1641/// Return the expected interruption time after finishing the `hold_process` computation,
1642/// which value may change if the corresponding process is preempted.
1643#[must_use = "computations are lazy and do nothing unless to be run"]
1644#[derive(Clone)]
1645pub struct InterruptionTime {
1646
1647    /// The process identifier.
1648    pid: Grc<ProcessId>
1649}
1650
1651impl Event for InterruptionTime {
1652
1653    type Item = Option<f64>;
1654
1655    #[doc(hidden)]
1656    #[inline]
1657    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1658        let InterruptionTime { pid } = self;
1659        match pid.interrupt_flag.read_at(p) {
1660            Some(false) => Result::Ok(Some(pid.interrupt_time.read_at(p))),
1661            _ => Result::Ok(None)
1662        }
1663    }
1664}
1665
1666/// Passivate the `Process` computation.
1667#[must_use = "computations are lazy and do nothing unless to be run"]
1668#[derive(Clone)]
1669pub struct Passivate {}
1670
1671impl Process for Passivate {
1672
1673    type Item = ();
1674
1675    #[doc(hidden)]
1676    #[inline]
1677    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1678        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1679    {
1680        let cont = ProcessBoxCont::new(cont);
1681        self.call_process_boxed(cont, pid, p)
1682    }
1683
1684    #[doc(hidden)]
1685    #[inline]
1686    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1687        match pid.react_flag.read_at(p) {
1688            Some(true) => {
1689                let msg = String::from("Cannot passivate the process twice");
1690                let err = OtherError::retry(msg);
1691                cut_error_process_boxed(cont, pid, err, p)
1692            },
1693            _ => {
1694                match pid.react_cont.swap_at(Some(cont), p) {
1695                    Some(_) => panic!("The reactivation continuation has diverged"),
1696                    None => {
1697                        pid.react_priority.write_at(p.priority, p);
1698                        pid.react_flag.write_at(Some(true), p);
1699                        Result::Ok(())
1700                    }
1701                }
1702            }
1703        }
1704    }
1705}
1706
1707/// Passivate the `Process` computation before performing some action.
1708#[must_use = "computations are lazy and do nothing unless to be run"]
1709#[derive(Clone)]
1710pub struct PassivateBefore<M> {
1711
1712    /// The action to perform after passivating the process.
1713    comp: M
1714}
1715
1716impl<M> Process for PassivateBefore<M>
1717    where M: Event<Item = ()>
1718{
1719    type Item = ();
1720
1721    #[doc(hidden)]
1722    #[inline]
1723    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1724        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1725    {
1726        let cont = ProcessBoxCont::new(cont);
1727        self.call_process_boxed(cont, pid, p)
1728    }
1729
1730    #[doc(hidden)]
1731    #[inline]
1732    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1733        match pid.react_flag.read_at(p) {
1734            Some(true) => {
1735                let msg = String::from("Cannot passivate the process twice");
1736                let err = OtherError::retry(msg);
1737                cut_error_process_boxed(cont, pid, err, p)
1738            },
1739            _ => {
1740                match pid.react_cont.swap_at(Some(cont), p) {
1741                    Some(_) => panic!("The reactivation continuation has diverged"),
1742                    None => {
1743                        let PassivateBefore { comp } = self;
1744                        pid.react_priority.write_at(p.priority, p);
1745                        pid.react_flag.write_at(Some(true), p);
1746                        comp.call_event(p)
1747                    }
1748                }
1749            }
1750        }
1751    }
1752}
1753
1754/// Test whether the process with the specified identifier was passivated.
1755#[must_use = "computations are lazy and do nothing unless to be run"]
1756#[derive(Clone)]
1757pub struct IsPassivated {
1758
1759    /// The process identifier.
1760    pid: Grc<ProcessId>
1761}
1762
1763impl Event for IsPassivated {
1764
1765    type Item = bool;
1766
1767    #[doc(hidden)]
1768    #[inline]
1769    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1770        let IsPassivated { pid } = self;
1771        match pid.react_flag.read_at(p) {
1772            Some(true) => Result::Ok(true),
1773            _ => Result::Ok(false)
1774        }
1775    }
1776}
1777
1778/// Reactivate the process with the specified identifier.
1779#[must_use = "computations are lazy and do nothing unless to be run"]
1780#[derive(Clone)]
1781pub struct Reactivate {
1782
1783    /// The process identifier.
1784    pid: Grc<ProcessId>
1785}
1786
1787impl Event for Reactivate {
1788
1789    type Item = ();
1790
1791    #[doc(hidden)]
1792    #[inline]
1793    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1794        let Reactivate { pid } = self;
1795        match pid.react_cont.swap_at(None, p) {
1796            Some(cont) => {
1797                match pid.react_flag.swap_at(None, p) {
1798                    Some(true) => {
1799                        let priority = pid.react_priority.read_at(p);
1800                        enqueue_event_with_priority(p.time, priority, {
1801                            cons_event(move |p| {
1802                                resume_process_boxed(cont, pid, (), p)
1803                            }).into_boxed()
1804                        }).call_event(p)
1805                    },
1806                    _ => {
1807                        panic!("The reactivation continuation has diverged")
1808                    }
1809                }
1810            },
1811            None => {
1812                Result::Ok(())
1813            }
1814        }
1815    }
1816}
1817
1818/// Reactivate immediately the process with the specified identifier.
1819#[must_use = "computations are lazy and do nothing unless to be run"]
1820#[derive(Clone)]
1821pub struct ReactivateImmediately {
1822
1823    /// The process identifier.
1824    pid: Grc<ProcessId>
1825}
1826
1827impl Event for ReactivateImmediately {
1828
1829    type Item = ();
1830
1831    #[doc(hidden)]
1832    #[inline]
1833    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1834        let ReactivateImmediately { pid } = self;
1835        match pid.react_cont.swap_at(None, p) {
1836            Some(cont) => {
1837                match pid.react_flag.swap_at(None, p) {
1838                    Some(true) => {
1839                        let priority = pid.react_priority.read_at(p);
1840                        if p.priority == priority {
1841                            resume_process_boxed(cont, pid, (), p)
1842                        } else {
1843                            enqueue_event_with_priority(p.time, priority, {
1844                                cons_event(move |p| {
1845                                    resume_process_boxed(cont, pid, (), p)
1846                                }).into_boxed()
1847                            }).call_event(p)
1848                        }
1849                    },
1850                    _ => {
1851                        panic!("The reactivation continuation has diverged")
1852                    }
1853                }
1854            },
1855            None => {
1856                Result::Ok(())
1857            }
1858        }
1859    }
1860}
1861
1862/// Reactivate immediately the processes with specified identifiers.
1863#[must_use = "computations are lazy and do nothing unless to be run"]
1864#[derive(Clone)]
1865pub struct ReactivateManyImmediately<I> {
1866
1867    /// The process identifiers.
1868    pids: I
1869}
1870
1871impl<I> Event for ReactivateManyImmediately<I>
1872    where I: Iterator<Item = Grc<ProcessId>> + 'static
1873{
1874    type Item = ();
1875
1876    #[doc(hidden)]
1877    #[inline]
1878    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1879        let ReactivateManyImmediately { mut pids } = self;
1880        match pids.next() {
1881            None => Result::Ok(()),
1882            Some(pid) => {
1883                match ProcessId::reactivate_immediately(pid).call_event(p) {
1884                    Result::Err(e) => Result::Err(e),
1885                    Result::Ok(()) => {
1886                        let comp = ReactivateManyImmediately { pids: pids };
1887                        yield_event(comp).call_event(p)
1888                    }
1889                }
1890            }
1891        }
1892    }
1893}
1894
1895/// Executes the `Process` computation in loop.
1896#[must_use = "computations are lazy and do nothing unless to be run"]
1897#[derive(Clone)]
1898pub struct Loop<F, M> {
1899
1900    /// The computation generator.
1901    f: F,
1902
1903    /// The type keeper.
1904    _phantom: PhantomData<M>
1905}
1906
1907impl<F, M> Process for Loop<F, M>
1908    where F: Fn() -> M + 'static,
1909          M: Process<Item = ()> + 'static
1910{
1911    type Item = M::Item;
1912
1913    #[doc(hidden)]
1914    #[inline(never)]
1915    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1916        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1917    {
1918        let Loop { f, _phantom } = self;
1919        let comp = f();
1920        comp.and_then(move |()| {
1921            Loop { f: f, _phantom: PhantomData }
1922        }).call_process(cont, pid, p)
1923    }
1924
1925    #[doc(hidden)]
1926    #[inline(never)]
1927    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1928        let Loop { f, _phantom } = self;
1929        let comp = f();
1930        comp.and_then(move |()| {
1931            Loop { f: f, _phantom: PhantomData }
1932        }).call_process_boxed(cont, pid, p)
1933    }
1934}
1935
1936/// The sequence of computations.
1937#[must_use = "computations are lazy and do nothing unless to be run"]
1938#[derive(Clone)]
1939pub struct Sequence<I, M> where M: Process {
1940
1941    /// The computations.
1942    comps: I,
1943
1944    /// The accumulator of results.
1945    acc: Vec<M::Item>
1946}
1947
1948impl<I, M> Process for Sequence<I, M>
1949    where I: Iterator<Item = M> + 'static,
1950          M: Process + 'static
1951{
1952    type Item = Vec<M::Item>;
1953
1954    #[doc(hidden)]
1955    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1956        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
1957    {
1958        let Sequence { mut comps, mut acc } = self;
1959        match comps.next() {
1960            None => cont(Result::Ok(acc), pid, p),
1961            Some(comp) => {
1962                comp.and_then(move |a| {
1963                    acc.push(a);
1964                    Sequence { comps: comps, acc: acc }
1965                }).call_process(cont, pid, p)
1966            }
1967        }
1968    }
1969
1970    #[doc(hidden)]
1971    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1972        let Sequence { mut comps, mut acc } = self;
1973        match comps.next() {
1974            None => cont.call_box((Result::Ok(acc), pid, p)),
1975            Some(comp) => {
1976                comp.and_then(move |a| {
1977                    acc.push(a);
1978                    Sequence { comps: comps, acc: acc }
1979                }).call_process_boxed(cont, pid, p)
1980            }
1981        }
1982    }
1983}
1984
1985/// The sequence of computations with ignored result.
1986#[must_use = "computations are lazy and do nothing unless to be run"]
1987#[derive(Clone)]
1988pub struct Sequence_<I, M>
1989{
1990    /// The computations.
1991    comps: I,
1992
1993    /// To keep the type parameter.
1994    _phantom: PhantomData<M>
1995}
1996
1997impl<I, M> Process for Sequence_<I, M>
1998    where I: Iterator<Item = M> + 'static,
1999          M: Process + 'static
2000{
2001    type Item = ();
2002
2003    #[doc(hidden)]
2004    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2005        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2006    {
2007        let Sequence_ { mut comps, _phantom } = self;
2008        match comps.next() {
2009            None => cont(Result::Ok(()), pid, p),
2010            Some(comp) => {
2011                comp.and_then(move |_| {
2012                    Sequence_ { comps: comps, _phantom: _phantom }
2013                }).call_process(cont, pid, p)
2014            }
2015        }
2016    }
2017
2018    #[doc(hidden)]
2019    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2020        let Sequence_ { mut comps, _phantom } = self;
2021        match comps.next() {
2022            None => cont.call_box((Result::Ok(()), pid, p)),
2023            Some(comp) => {
2024                comp.and_then(move |_| {
2025                    Sequence_ { comps: comps, _phantom: _phantom }
2026                }).call_process_boxed(cont, pid, p)
2027            }
2028        }
2029    }
2030}
2031
2032/// Allows spawning another process bound with the current one.
2033#[must_use = "computations are lazy and do nothing unless to be run"]
2034#[derive(Clone)]
2035pub struct SpawnUsingId<M>
2036{
2037    /// How to cancel the processes.
2038    cancellation: ProcessCancellation,
2039
2040    /// The computation.
2041    comp: M,
2042
2043    /// The child process identifier.
2044    comp_id: Grc<ProcessId>
2045}
2046
2047impl<M> Process for SpawnUsingId<M>
2048    where M: Process<Item = ()> + 'static,
2049{
2050    type Item = ();
2051
2052    #[doc(hidden)]
2053    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2054        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2055    {
2056        if is_process_cancelled(&pid, p) {
2057            revoke_process(cont, pid, p)
2058        } else {
2059            let SpawnUsingId { cancellation, comp, comp_id } = self;
2060            match ProcessId::prepare(comp_id.clone(), p) {
2061                Result::Err(e) => Result::Err(e),
2062                Result::Ok(()) => {
2063                    let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2064                    match hs {
2065                        Result::Err(e) => Result::Err(e),
2066                        Result::Ok(hs) => {
2067                            enqueue_event(p.time, {
2068                                cons_event(move |p| {
2069                                    let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2070                                        hs.dispose(p)?;
2071                                        match a {
2072                                            Result::Ok(()) => Result::Ok(()),
2073                                            Result::Err(Error::Cancel) => Result::Ok(()),
2074                                            Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2075                                        }
2076                                    };
2077                                    comp.call_process(cont, comp_id, p)
2078                                }).into_boxed()
2079                            }).and_then(move |()| {
2080                                cons_event(move |p| {
2081                                    resume_process(cont, pid, (), p)
2082                                })
2083                            }).call_event(p)
2084                        }
2085                    }
2086                }
2087            }
2088        }
2089    }
2090
2091    #[doc(hidden)]
2092    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2093        if is_process_cancelled(&pid, p) {
2094            revoke_process_boxed(cont, pid, p)
2095        } else {
2096            let SpawnUsingId { cancellation, comp, comp_id } = self;
2097            match ProcessId::prepare(comp_id.clone(), p) {
2098                Result::Err(e) => Result::Err(e),
2099                Result::Ok(()) => {
2100                    let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2101                    match hs {
2102                        Result::Err(e) => Result::Err(e),
2103                        Result::Ok(hs) => {
2104                            enqueue_event(p.time, {
2105                                cons_event(move |p| {
2106                                    let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2107                                        hs.dispose(p)?;
2108                                        match a {
2109                                            Result::Ok(()) => Result::Ok(()),
2110                                            Result::Err(Error::Cancel) => Result::Ok(()),
2111                                            Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2112                                        }
2113                                    };
2114                                    comp.call_process(cont, comp_id, p)
2115                                }).into_boxed()
2116                            }).and_then(move |()| {
2117                                cons_event(move |p| {
2118                                    resume_process_boxed(cont, pid, (), p)
2119                                })
2120                            }).call_event(p)
2121                        }
2122                    }
2123                }
2124            }
2125        }
2126    }
2127}
2128
2129/// Allows spawning another process bound with the current one.
2130#[must_use = "computations are lazy and do nothing unless to be run"]
2131#[derive(Clone)]
2132pub struct Spawn<M>
2133{
2134    /// How to cancel the processes.
2135    cancellation: ProcessCancellation,
2136
2137    /// The computation.
2138    comp: M
2139}
2140
2141impl<M> Process for Spawn<M>
2142    where M: Process<Item = ()> + 'static
2143{
2144    type Item = ();
2145
2146    #[doc(hidden)]
2147    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2148        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2149    {
2150        if is_process_cancelled(&pid, p) {
2151            revoke_process(cont, pid, p)
2152        } else {
2153            let Spawn { cancellation, comp } = self;
2154            ProcessId::new()
2155                .into_process()
2156                .and_then(move |pid| {
2157                    spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2158                })
2159                .call_process(cont, pid, p)
2160        }
2161    }
2162
2163    #[doc(hidden)]
2164    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2165        if is_process_cancelled(&pid, p) {
2166            revoke_process_boxed(cont, pid, p)
2167        } else {
2168            let Spawn { cancellation, comp } = self;
2169            ProcessId::new()
2170                .into_process()
2171                .and_then(move |pid| {
2172                    spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2173                })
2174                .call_process_boxed(cont, pid, p)
2175        }
2176    }
2177}
2178
2179/// Allows running another process within the specified timeout by using the given identifier.
2180#[must_use = "computations are lazy and do nothing unless to be run"]
2181#[derive(Clone)]
2182pub struct TimeoutUsingId<M>
2183{
2184    /// Timeout.
2185    timeout: f64,
2186
2187    /// The computation.
2188    comp: M,
2189
2190    /// The child process identifier.
2191    comp_id: Grc<ProcessId>
2192}
2193
2194impl<M> Process for TimeoutUsingId<M>
2195    where M: Process + 'static,
2196          M::Item: Clone
2197{
2198    type Item = Option<M::Item>;
2199
2200    #[doc(hidden)]
2201    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2202        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2203    {
2204        if is_process_cancelled(&pid, p) {
2205            revoke_process(cont, pid, p)
2206        } else {
2207            let TimeoutUsingId { timeout, comp, comp_id } = self;
2208            let comp_id_clone = comp_id.clone();
2209            let s = Grc::new(ObservableSource::new());
2210            let s_clone = s.clone();
2211            ProcessId::new()
2212                .into_process()
2213                .and_then(move |timeout_id| {
2214                    let timeout_id = Grc::new(timeout_id);
2215                    spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2216                        hold_process(timeout)
2217                            .and_then(move |()| {
2218                                ProcessId::cancel(comp_id_clone)
2219                                    .into_process()
2220                            })
2221                    })
2222                    .and_then(move |()| {
2223                        let r = Grc::new(RefComp::new(None));
2224                        spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2225                            let r_clone = r.clone();
2226                            comp.and_then(move |item| {
2227                                    RefComp::write(r_clone, Some(item))
2228                                        .into_process()
2229                                })
2230                                .finally({
2231                                    ProcessId::cancel(timeout_id)
2232                                        .and_then(move |()| {
2233                                            RefComp::read(r)
2234                                                .and_then(move |item| {
2235                                                    s_clone.trigger(item)
2236                                                })
2237                                        })
2238                                        .into_process()
2239                                })
2240                        })
2241                    })
2242                })
2243                .and_then(move |()| {
2244                    process_await(s.publish())
2245                })
2246                .call_process(cont, pid, p)
2247        }
2248    }
2249
2250    #[doc(hidden)]
2251    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2252        if is_process_cancelled(&pid, p) {
2253            revoke_process_boxed(cont, pid, p)
2254        } else {
2255            let TimeoutUsingId { timeout, comp, comp_id } = self;
2256            let comp_id_clone = comp_id.clone();
2257            let s = Grc::new(ObservableSource::new());
2258            let s_clone = s.clone();
2259            ProcessId::new()
2260                .into_process()
2261                .and_then(move |timeout_id| {
2262                    let timeout_id = Grc::new(timeout_id);
2263                    spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2264                        hold_process(timeout)
2265                            .and_then(move |()| {
2266                                ProcessId::cancel(comp_id_clone)
2267                                    .into_process()
2268                            })
2269                    })
2270                    .and_then(move |()| {
2271                        let r = Grc::new(RefComp::new(None));
2272                        spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2273                            let r_clone = r.clone();
2274                            comp.and_then(move |item| {
2275                                    RefComp::write(r_clone, Some(item))
2276                                        .into_process()
2277                                })
2278                                .finally({
2279                                    ProcessId::cancel(timeout_id)
2280                                        .and_then(move |()| {
2281                                            RefComp::read(r)
2282                                                .and_then(move |item| {
2283                                                    s_clone.trigger(item)
2284                                                })
2285                                        })
2286                                        .into_process()
2287                                })
2288                        })
2289                    })
2290                })
2291                .and_then(move |()| {
2292                    process_await(s.publish())
2293                })
2294                .call_process_boxed(cont, pid, p)
2295        }
2296    }
2297}
2298
2299/// Allows running another process within the specified timeout.
2300#[must_use = "computations are lazy and do nothing unless to be run"]
2301#[derive(Clone)]
2302pub struct Timeout<M>
2303{
2304    /// Timeout.
2305    timeout: f64,
2306
2307    /// The computation.
2308    comp: M
2309}
2310
2311impl<M> Process for Timeout<M>
2312    where M: Process + 'static,
2313          M::Item: Clone
2314{
2315    type Item = Option<M::Item>;
2316
2317    #[doc(hidden)]
2318    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2319        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2320    {
2321        if is_process_cancelled(&pid, p) {
2322            revoke_process(cont, pid, p)
2323        } else {
2324            let Timeout { timeout, comp } = self;
2325            ProcessId::new()
2326                .into_process()
2327                .and_then(move |comp_id| {
2328                    timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2329                })
2330                .call_process(cont, pid, p)
2331        }
2332    }
2333
2334    #[doc(hidden)]
2335    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2336        if is_process_cancelled(&pid, p) {
2337            revoke_process_boxed(cont, pid, p)
2338        } else {
2339            let Timeout { timeout, comp } = self;
2340            ProcessId::new()
2341                .into_process()
2342                .and_then(move |comp_id| {
2343                    timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2344                })
2345                .call_process_boxed(cont, pid, p)
2346        }
2347    }
2348}
2349
2350/// Represents a temporarily frozen computation.
2351#[doc(hidden)]
2352pub struct FrozenProcess<T> {
2353
2354    /// The frozen continuation.
2355    comp: EventBox<Option<ProcessBoxCont<T>>>
2356}
2357
2358impl<T> FrozenProcess<T>
2359    where T: 'static
2360{
2361    /// Unfreeze the process computation.
2362    #[doc(hidden)]
2363    #[inline]
2364    pub fn unfreeze(self, p: &Point) -> simulation::Result<Option<ProcessBoxCont<T>>> {
2365        let FrozenProcess { comp } = self;
2366        comp.call_box((p,))
2367    }
2368
2369    /// Freeze the process computation.
2370    #[doc(hidden)]
2371    pub fn new(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<FrozenProcess<T>> {
2372        let cont = substitute_process_priority_boxed(p.priority, cont);
2373        let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2374        let rc = Grc::new(RefComp::new(Some(cont)));
2375        let h  = {
2376            let rh = rh.clone();
2377            let rc = rc.clone();
2378            let weak_pid = Grc::downgrade(&pid);
2379            pid.cancel_initiating()
2380                .subscribe(cons_observer(move |_, p| {
2381                    let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2382                    let h = rh.swap_at(None, p).unwrap();
2383                    h.dispose(p)?;
2384                    match rc.swap_at(None, p) {
2385                        None => Result::Ok(()),
2386                        Some(cont) => {
2387                            let pid = pid.clone();
2388                            enqueue_event(p.time, {
2389                                cons_event(move |p| {
2390                                    let z = is_process_cancelled(&pid, p);
2391                                    if z {
2392                                        revoke_process_boxed(cont, pid, p)
2393                                    } else {
2394                                        Result::Ok(())
2395                                    }
2396                                }).into_boxed()
2397                            }).call_event(p)
2398                        }
2399                    }
2400                }))
2401                .call_event(p)
2402        };
2403        match h {
2404            Result::Err(e) => Result::Err(e),
2405            Result::Ok(h) => {
2406                rh.write_at(Some(h), p);
2407                let comp = {
2408                    cons_event(move |p| {
2409                        let h = rh.swap_at(None, p).unwrap();
2410                        h.dispose(p)?;
2411                        let cont = rc.swap_at(None, p);
2412                        Result::Ok(cont)
2413                    }).into_boxed()
2414                };
2415                Result::Ok(FrozenProcess { comp: comp })
2416            }
2417        }
2418    }
2419
2420    /// Freeze the computation parameters specifying what should be done when reentering the computation.
2421    #[doc(hidden)]
2422    pub fn with_reentering<M>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, comp: M, p: &Point) -> simulation::Result<FrozenProcess<T>>
2423        where M: Process<Item = T> + 'static
2424    {
2425        let cont = substitute_process_priority_boxed(p.priority, cont);
2426        let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2427        let rc = Grc::new(RefComp::new(Some(cont)));
2428        let h  = {
2429            let rh = rh.clone();
2430            let rc = rc.clone();
2431            let weak_pid = Grc::downgrade(&pid);
2432            pid.cancel_initiating()
2433                .subscribe(cons_observer(move |_, p| {
2434                    let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2435                    let h = rh.swap_at(None, p).unwrap();
2436                    h.dispose(p)?;
2437                    match rc.swap_at(None, p) {
2438                        None => Result::Ok(()),
2439                        Some(cont) => {
2440                            let pid = pid.clone();
2441                            enqueue_event(p.time, {
2442                                cons_event(move |p| {
2443                                    let z = is_process_cancelled(&pid, p);
2444                                    if z {
2445                                        revoke_process_boxed(cont, pid, p)
2446                                    } else {
2447                                        Result::Ok(())
2448                                    }
2449                                }).into_boxed()
2450                            }).call_event(p)
2451                        }
2452                    }
2453                }))
2454                .call_event(p)
2455        };
2456        match h {
2457            Result::Err(e) => Result::Err(e),
2458            Result::Ok(h) => {
2459                rh.write_at(Some(h), p);
2460                let comp = {
2461                    cons_event(move |p| {
2462                        let h = rh.swap_at(None, p).unwrap();
2463                        h.dispose(p)?;
2464                        match rc.swap_at(None, p) {
2465                            None => Result::Ok(None),
2466                            Some(cont) => {
2467                                let f = pid.is_preempted(p);
2468                                if !f  {
2469                                    Result::Ok(Some(cont))
2470                                } else {
2471                                    let cont = ProcessBoxCont::new(move |a, pid, p| {
2472                                        match a {
2473                                            Result::Ok(_) => comp.call_process_boxed(cont, pid, p),
2474                                            Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2475                                        }
2476                                    });
2477                                    match sleep_process(cont, pid, val, p) {
2478                                        Result::Ok(()) => Result::Ok(None),
2479                                        Result::Err(e) => Result::Err(e)
2480                                    }
2481                                }
2482                            }
2483                        }
2484                    }).into_boxed()
2485                };
2486                Result::Ok(FrozenProcess { comp: comp })
2487            }
2488        }
2489    }
2490}
2491
2492/// Reenter the process computation.
2493#[doc(hidden)]
2494pub fn reenter_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2495    where T: 'static
2496{
2497    let f = pid.is_preempted(p);
2498    if !f {
2499        enqueue_event(p.time, {
2500            cons_event(move |p| {
2501                let f = pid.is_preempted(p);
2502                if !f {
2503                    resume_process_boxed(cont, pid, val, p)
2504                } else {
2505                    sleep_process(cont, pid, val, p)
2506                }
2507            }).into_boxed()
2508        }).call_event(p)
2509    } else {
2510        sleep_process(cont, pid, val, p)
2511    }
2512}
2513
2514/// Sleep until the preempted computation will be reentered.
2515#[doc(hidden)]
2516pub fn sleep_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2517    where T: 'static
2518{
2519    let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2520    let rc = Grc::new(RefComp::new(Some(cont)));
2521    let rv = Grc::new(RefComp::new(Some(val)));
2522    let h  = {
2523        let rh = rh.clone();
2524        pid.observable()
2525            .subscribe(cons_observer(move |e, p| {
2526                let h = rh.swap_at(None, p).unwrap();
2527                h.dispose(p)?;
2528                match e {
2529                    &ProcessEvent::CancelInitiating => {
2530                        let pid = pid.clone();
2531                        let rc  = rc.clone();
2532                        enqueue_event(p.time, {
2533                            cons_event(move |p| {
2534                                let z = is_process_cancelled(&pid, p);
2535                                if z {
2536                                    let cont = rc.swap_at(None, p).unwrap();
2537                                    revoke_process_boxed(cont, pid, p)
2538                                } else {
2539                                    Result::Ok(())
2540                                }
2541                            }).into_boxed()
2542                        }).call_event(p)
2543                    },
2544                    &ProcessEvent::PreemptionEnding => {
2545                        let pid = pid.clone();
2546                        let rc  = rc.clone();
2547                        let rv  = rv.clone();
2548                        enqueue_event(p.time, {
2549                            cons_event(move |p| {
2550                                let cont = rc.swap_at(None, p).unwrap();
2551                                let val  = rv.swap_at(None, p).unwrap();
2552                                reenter_process(cont, pid, val, p)
2553                            }).into_boxed()
2554                        }).call_event(p)
2555                    },
2556                    &ProcessEvent::PreemptionInitiating => {
2557                        panic!("The computation was already preempted")
2558                    }
2559                }
2560            }))
2561            .call_event(p)
2562    };
2563    match h {
2564        Result::Err(e) => Result::Err(e),
2565        Result::Ok(h) => {
2566            rh.write_at(Some(h), p);
2567            Result::Ok(())
2568        }
2569    }
2570}
2571
2572/// Substitute the process computation.
2573#[doc(hidden)]
2574pub fn substitute_process<C, T, F>(cont: C, f: F) -> ProcessBoxCont<T>
2575    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
2576          T: 'static,
2577          F: FnOnce(C, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static,
2578{
2579    ProcessBoxCont::new(move |a, pid, p| {
2580        match a {
2581            Result::Ok(a) => f(cont, pid, a, p),
2582            Result::Err(e) => cont(Result::Err(e), pid, p)
2583        }
2584    })
2585}
2586
2587/// Substitute the process computation.
2588#[doc(hidden)]
2589pub fn substitute_process_boxed<T, F>(cont: ProcessBoxCont<T>, f: F) -> ProcessBoxCont<T>
2590    where T: 'static,
2591          F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static
2592{
2593    ProcessBoxCont::new(move |a, pid, p| {
2594        match a {
2595            Result::Ok(a) => f(cont, pid, a, p),
2596            Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2597        }
2598    })
2599}
2600
2601/// Substitute the process priority.
2602#[doc(hidden)]
2603pub fn substitute_process_priority<C, T>(priority: isize, cont: C) -> ProcessBoxCont<T>
2604    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
2605          T: 'static
2606{
2607    ProcessBoxCont::new(move |a, pid, p| {
2608        match a {
2609            Result::Ok(a) => {
2610                if p.priority == priority {
2611                    cont(Result::Ok(a), pid, p)
2612                } else {
2613                    enqueue_event_with_priority(p.time, priority, {
2614                        cons_event(move |p| {
2615                            resume_process(cont, pid, a, p)
2616                        }).into_boxed()
2617                    }).call_event(p)
2618                }
2619            },
2620            Result::Err(e) => {
2621                cont(Result::Err(e), pid, p)
2622            }
2623        }
2624    })
2625}
2626
2627/// Substitute the process priority.
2628#[doc(hidden)]
2629pub fn substitute_process_priority_boxed<T>(priority: isize, cont: ProcessBoxCont<T>) -> ProcessBoxCont<T>
2630    where T: 'static
2631{
2632    ProcessBoxCont::new(move |a, pid, p| {
2633        match a {
2634            Result::Ok(a) => {
2635                if p.priority == priority {
2636                    cont.call_box((Result::Ok(a), pid, p))
2637                } else {
2638                    enqueue_event_with_priority(p.time, priority, {
2639                        cons_event(move |p| {
2640                            resume_process_boxed(cont, pid, a, p)
2641                        }).into_boxed()
2642                    }).call_event(p)
2643                }
2644            },
2645            Result::Err(e) => {
2646                cont.call_box((Result::Err(e), pid, p))
2647            }
2648        }
2649    })
2650}
2651
2652/// Await a signal that should be emitted by the specified observable.
2653#[must_use = "computations are lazy and do nothing unless to be run"]
2654#[derive(Clone)]
2655pub struct Await<O, M> {
2656
2657    /// The observable.
2658    observable: O,
2659
2660    /// To keep the type parameter.
2661    _phantom: PhantomData<M>
2662}
2663
2664impl<O, M> Process for Await<O, M>
2665    where O: Observable<Message = M>,
2666          M: Clone + 'static
2667{
2668    type Item = M;
2669
2670    #[doc(hidden)]
2671    #[inline]
2672    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2673        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2674    {
2675        self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2676    }
2677
2678    #[doc(hidden)]
2679    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2680        match FrozenProcess::new(cont, pid.clone(), p) {
2681            Result::Err(e) => Result::Err(e),
2682            Result::Ok(cont) => {
2683                let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2684                let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2685                let rc = RefComp::new(Some(cont));
2686                let h  = {
2687                    let rh = rh.clone();
2688                    let rh2 = rh2.clone();
2689                    let pid = pid.clone();
2690                    let Await { observable, _phantom } = self;
2691                    observable
2692                        .subscribe(cons_observer(move |x: &M, p| {
2693                            if let Some(h) = rh.swap_at(None, p) {
2694                                h.dispose(p)?;
2695                            }                            
2696                            if let Some(h2) = rh2.swap_at(None, p) {
2697                                h2.dispose(p)?;
2698                            }                            
2699                            let cont = rc.swap_at(None, p).unwrap();
2700                            match cont.unfreeze(p) {
2701                                Result::Err(e) => Result::Err(e),
2702                                Result::Ok(None) => Result::Ok(()),
2703                                Result::Ok(Some(cont)) => {
2704                                    let pid = pid.clone();
2705                                    reenter_process(cont, pid, x.clone(), p)
2706                                }
2707                            }
2708                        }))
2709                        .call_event(p)
2710                };
2711                match h {
2712                    Result::Err(e) => return Result::Err(e),
2713                    Result::Ok(h) => {
2714                        let h2 = {
2715                            let rh = rh.clone();
2716                            let rh2 = rh2.clone();
2717                            pid.cancel_initiating()
2718                                .subscribe(cons_observer(move |_, p| {
2719                                    if let Some(h) = rh.swap_at(None, p) {
2720                                        h.dispose(p)?;
2721                                    }                            
2722                                    if let Some(h2) = rh2.swap_at(None, p) {
2723                                        h2.dispose(p)?;
2724                                    }
2725                                    Result::Ok(())
2726                                }))
2727                                .call_event(p)
2728                        };
2729                        match h2 {
2730                            Result::Err(e) => {
2731                                h.dispose(p)?;
2732                                return Result::Err(e)
2733                            },
2734                            Result::Ok(h2) => {
2735                                rh.write_at(Some(h), p);
2736                                rh2.write_at(Some(h2), p);
2737                                Result::Ok(())
2738                            }
2739                        }
2740                    }
2741                }
2742            }
2743        }
2744    }
2745}
2746
2747/// Await a cancellable signal that should be emitted by the specified observable.
2748#[must_use = "computations are lazy and do nothing unless to be run"]
2749#[derive(Clone)]
2750pub struct AwaitResult<O, M> {
2751
2752    /// The observable.
2753    observable: O,
2754
2755    /// To keep the type parameter.
2756    _phantom: PhantomData<M>
2757}
2758
2759impl<O, M> Process for AwaitResult<O, M>
2760    where O: Observable<Message = simulation::Result<M>>,
2761          M: Clone + 'static
2762{
2763    type Item = M;
2764
2765    #[doc(hidden)]
2766    #[inline]
2767    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2768        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2769    {
2770        self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2771    }
2772
2773    #[doc(hidden)]
2774    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2775        match FrozenProcess::new(cont, pid.clone(), p) {
2776            Result::Err(e) => Result::Err(e),
2777            Result::Ok(cont) => {
2778                let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2779                let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2780                let rc = RefComp::new(Some(cont));
2781                let h  = {
2782                    let rh = rh.clone();
2783                    let rh2 = rh2.clone();
2784                    let pid = pid.clone();
2785                    let AwaitResult { observable, _phantom } = self;
2786                    observable
2787                        .subscribe(cons_observer(move |x: &simulation::Result<M>, p| {
2788                            if let Some(h) = rh.swap_at(None, p) {
2789                                h.dispose(p)?;
2790                            }                            
2791                            if let Some(h2) = rh2.swap_at(None, p) {
2792                                h2.dispose(p)?;
2793                            }                            
2794                            let cont = rc.swap_at(None, p).unwrap();
2795                            match cont.unfreeze(p) {
2796                                Result::Err(e) => Result::Err(e),
2797                                Result::Ok(None) => Result::Ok(()),
2798                                Result::Ok(Some(cont)) => {
2799                                    let pid = pid.clone();
2800                                    match x {
2801                                        Result::Ok(x) => {
2802                                            reenter_process(cont, pid, x.clone(), p)
2803                                        },
2804                                        Result::Err(Error::Cancel) => {
2805                                            revoke_process_boxed(cont, pid, p)
2806                                        },
2807                                        Result::Err(Error::Other(e)) => {
2808                                            match e.deref() {
2809                                                &OtherError::Retry(_) => {
2810                                                    cut_error_process_boxed(cont, pid, e.clone(), p)
2811                                                },
2812                                                &OtherError::Panic(_) => {
2813                                                    cut_error_process_boxed(cont, pid, e.clone(), p)
2814                                                },
2815                                                &OtherError::IO(_) => {
2816                                                    propagate_error_process_boxed(cont, pid, e.clone(), p)
2817                                                }
2818                                            }
2819                                        }
2820                                    }
2821                                }
2822                            }
2823                        }))
2824                        .call_event(p)
2825                };
2826                match h {
2827                    Result::Err(e) => return Result::Err(e),
2828                    Result::Ok(h) => {
2829                        let h2 = {
2830                            let rh = rh.clone();
2831                            let rh2 = rh2.clone();
2832                            pid.cancel_initiating()
2833                                .subscribe(cons_observer(move |_, p| {
2834                                    if let Some(h) = rh.swap_at(None, p) {
2835                                        h.dispose(p)?;
2836                                    }                            
2837                                    if let Some(h2) = rh2.swap_at(None, p) {
2838                                        h2.dispose(p)?;
2839                                    }
2840                                    Result::Ok(())
2841                                }))
2842                                .call_event(p)
2843                        };
2844                        match h2 {
2845                            Result::Err(e) => {
2846                                h.dispose(p)?;
2847                                return Result::Err(e)
2848                            },
2849                            Result::Ok(h2) => {
2850                                rh.write_at(Some(h), p);
2851                                rh2.write_at(Some(h2), p);
2852                                Result::Ok(())
2853                            }
2854                        }
2855                    }
2856                }
2857            }
2858        }
2859    }
2860}
2861
2862/// Like the GoTo statement it transfers the direction of computation.
2863#[must_use = "computations are lazy and do nothing unless to be run"]
2864#[derive(Clone)]
2865pub struct Transfer<T, M> {
2866
2867    /// The computation to transfer the flow control to.
2868    comp: M,
2869
2870    /// To keep the type parameter.
2871    _phantom: PhantomData<T>
2872}
2873
2874impl<T, M> Process for Transfer<T, M>
2875    where M: Process<Item = ()>
2876{
2877    type Item = T;
2878
2879    #[doc(hidden)]
2880    #[inline]
2881    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2882        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2883    {
2884        if is_process_cancelled(&pid, p) {
2885            revoke_process(cont, pid, p)
2886        } else {
2887            let Transfer { comp, _phantom } = self;
2888            fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2889                a
2890            }
2891            comp.call_process(cont, pid, p)
2892        }
2893    }
2894
2895    #[doc(hidden)]
2896    #[inline]
2897    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2898        if is_process_cancelled(&pid, p) {
2899            revoke_process_boxed(cont, pid, p)
2900        } else {
2901            let Transfer { comp, _phantom } = self;
2902            fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2903                a
2904            }
2905            comp.call_process(cont, pid, p)
2906        }
2907    }
2908}
2909
2910/// The process that never returns a result.
2911#[must_use = "computations are lazy and do nothing unless to be run"]
2912#[derive(Clone)]
2913pub struct Never<T> {
2914
2915    /// To keep the type parameter.
2916    _phantom: PhantomData<T>
2917}
2918
2919impl<T> Process for Never<T> {
2920
2921    type Item = T;
2922
2923    #[doc(hidden)]
2924    #[inline]
2925    fn call_process<C>(self, _cont: C, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()>
2926        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2927    {
2928        Result::Ok(())
2929    }
2930
2931    #[doc(hidden)]
2932    #[inline]
2933    fn call_process_boxed(self, _cont: ProcessBoxCont<Self::Item>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2934        Result::Ok(())
2935    }
2936}
2937
2938/// Register a handler that will be invoked in case of cancelling the current process.
2939#[must_use = "computations are lazy and do nothing unless to be run"]
2940#[derive(Clone)]
2941pub struct WhenCancelling<M> {
2942
2943    /// The computation.
2944    comp: M
2945}
2946
2947impl<M> Process for WhenCancelling<M>
2948    where M: Observer<Message = (), Item = ()> + 'static
2949{
2950    type Item = ();
2951
2952    #[doc(hidden)]
2953    #[inline]
2954    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2955        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
2956    {
2957        if is_process_cancelled(&pid, p) {
2958            revoke_process(cont, pid, p)
2959        } else {
2960            let WhenCancelling { comp } = self;
2961            pid.cancel_initiating()
2962                .subscribe(cons_observer(move |_, p| {
2963                    comp.call_observer(&(), p)
2964                }))
2965                .call_event(p)?;
2966            resume_process(cont, pid, (), p)
2967        }
2968    }
2969
2970    #[doc(hidden)]
2971    #[inline]
2972    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2973        if is_process_cancelled(&pid, p) {
2974            revoke_process_boxed(cont, pid, p)
2975        } else {
2976            let WhenCancelling { comp } = self;
2977            pid.cancel_initiating()
2978                .subscribe(cons_observer(move |_, p| {
2979                    comp.call_observer(&(), p)
2980                }))
2981                .call_event(p)?;
2982            resume_process_boxed(cont, pid, (), p)
2983        }
2984    }
2985}
2986
2987/// Allows creating the `Process` computation that panics with the specified message.
2988#[must_use = "computations are lazy and do nothing unless to be run"]
2989#[derive(Clone)]
2990pub struct Panic<T> {
2991
2992    /// The panic message.
2993    msg: String,
2994
2995    /// Keep the type parameter.
2996    _phantom: PhantomData<T>
2997}
2998
2999impl<T> Process for Panic<T> {
3000
3001    type Item = T;
3002
3003    #[doc(hidden)]
3004    #[inline]
3005    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3006        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3007    {
3008        if is_process_cancelled(&pid, p) {
3009            revoke_process(cont, pid, p)
3010        } else {
3011            let Panic { msg, _phantom } = self;
3012            let err = Rc::new(OtherError::Panic(msg));
3013            cut_error_process(cont, pid, err, p)
3014        }
3015    }
3016
3017    #[doc(hidden)]
3018    #[inline]
3019    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3020        if is_process_cancelled(&pid, p) {
3021            revoke_process_boxed(cont, pid, p)
3022        } else {
3023            let Panic { msg, _phantom } = self;
3024            let err = Rc::new(OtherError::Panic(msg));
3025            cut_error_process_boxed(cont, pid, err, p)
3026        }
3027    }
3028}
3029
3030/// Trace the computation.
3031#[must_use = "computations are lazy and do nothing unless to be run"]
3032#[derive(Clone)]
3033pub struct Trace<M> {
3034
3035    /// The computation.
3036    comp: M,
3037
3038    /// The message to print.
3039    msg: String
3040}
3041
3042impl<M> Process for Trace<M>
3043    where M: Process
3044{
3045    type Item = M::Item;
3046
3047    #[doc(hidden)]
3048    #[inline]
3049    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3050        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3051    {
3052        if is_process_cancelled(&pid, p) {
3053            revoke_process(cont, pid, p)
3054        } else {
3055            let Trace { comp, msg } = self;
3056            p.trace(&msg);
3057            comp.call_process(cont, pid, p)
3058        }
3059    }
3060
3061    #[doc(hidden)]
3062    #[inline]
3063    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3064        if is_process_cancelled(&pid, p) {
3065            revoke_process_boxed(cont, pid, p)
3066        } else {
3067            let Trace { comp, msg } = self;
3068            p.trace(&msg);
3069            comp.call_process_boxed(cont, pid, p)
3070        }
3071    }
3072}
3073/// Proceed with the process that would use the specified time point priority.
3074#[must_use = "computations are lazy and do nothing unless to be run"]
3075#[derive(Clone)]
3076pub struct ProcessWithPriority {
3077
3078    /// The time point priority.
3079    priority: isize
3080}
3081
3082impl Process for ProcessWithPriority {
3083
3084    type Item = ();
3085
3086    #[doc(hidden)]
3087    #[inline]
3088    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3089        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3090    {
3091        if is_process_cancelled(&pid, p) {
3092            revoke_process(cont, pid, p)
3093        } else {
3094            let ProcessWithPriority { priority } = self;
3095            if priority == p.priority {
3096                cont(Result::Ok(()), pid, p)
3097            } else {
3098                let comp = cons_event(move |p| {
3099                    if is_process_cancelled(&pid, p) {
3100                        revoke_process(cont, pid, p)
3101                    } else {
3102                        cont(Result::Ok(()), pid, p)
3103                    }
3104                });
3105                enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3106                    .call_event(p)
3107            }
3108        }
3109    }
3110
3111    #[doc(hidden)]
3112    #[inline]
3113    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3114        if is_process_cancelled(&pid, p) {
3115            revoke_process_boxed(cont, pid, p)
3116        } else {
3117            let ProcessWithPriority { priority } = self;
3118            if priority == p.priority {
3119                cont.call_box((Result::Ok(()), pid, p))
3120            } else {
3121                let comp = cons_event(move |p| {
3122                    if is_process_cancelled(&pid, p) {
3123                        revoke_process_boxed(cont, pid, p)
3124                    } else {
3125                        cont.call_box((Result::Ok(()), pid, p))
3126                    }
3127                });
3128                enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3129                    .call_event(p)
3130            }
3131        }
3132    }
3133}
3134
3135/// Embeds the result value within the `Process` computation.
3136#[must_use = "computations are lazy and do nothing unless to be run"]
3137#[derive(Clone)]
3138pub struct EmbedResult<T> {
3139
3140    /// The value to be embedded.
3141    val: simulation::Result<T>
3142}
3143
3144impl<T> Process for EmbedResult<T> {
3145
3146    type Item = T;
3147
3148    #[doc(hidden)]
3149    #[inline]
3150    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3151        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
3152    {
3153        if is_process_cancelled(&pid, p) {
3154            revoke_process(cont, pid, p)
3155        } else {
3156            let EmbedResult { val } = self;
3157            match val {
3158                Result::Ok(a) => {
3159                    cont(Result::Ok(a), pid, p)
3160                },
3161                Result::Err(Error::Cancel) => {
3162                    let comp = cancel_process();
3163                    comp.call_process(cont, pid, p)
3164                },
3165                Result::Err(Error::Other(e)) => {
3166                    match e.deref() {
3167                        &OtherError::Retry(_) => {
3168                            cut_error_process(cont, pid, e, p)
3169                        },
3170                        &OtherError::Panic(_) => {
3171                            cut_error_process(cont, pid, e, p)
3172                        },
3173                        &OtherError::IO(_) => {
3174                            propagate_error_process(cont, pid, e, p)
3175                        }
3176                    }
3177                }
3178            }
3179        }
3180    }
3181
3182    #[doc(hidden)]
3183    #[inline]
3184    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3185        if is_process_cancelled(&pid, p) {
3186            revoke_process_boxed(cont, pid, p)
3187        } else {
3188            let EmbedResult { val } = self;
3189            match val {
3190                Result::Ok(a) => {
3191                    cont.call_box((Result::Ok(a), pid, p))
3192                },
3193                Result::Err(Error::Cancel) => {
3194                    let comp = cancel_process();
3195                    comp.call_process_boxed(cont, pid, p)
3196                },
3197                Result::Err(Error::Other(e)) => {
3198                    match e.deref() {
3199                        &OtherError::Retry(_) => {
3200                            cut_error_process_boxed(cont, pid, e, p)
3201                        },
3202                        &OtherError::Panic(_) => {
3203                            cut_error_process_boxed(cont, pid, e, p)
3204                        },
3205                        &OtherError::IO(_) => {
3206                            propagate_error_process_boxed(cont, pid, e, p)
3207                        }
3208                    }
3209                }
3210            }
3211        }
3212    }
3213}