dvcompute_branch/simulation/process/
mod.rs

1// Copyright (c) 2020-2022  David Sorokin <davsor@mail.ru>, based in Yoshkar-Ola, Russia
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at https://mozilla.org/MPL/2.0/.
6
7use std::rc::Rc;
8use std::marker::PhantomData;
9use std::ops::Deref;
10
11use dvcompute_utils::collections::im::*;
12
13use crate::simulation;
14use crate::simulation::Point;
15use crate::simulation::error::*;
16use crate::simulation::event::*;
17use crate::simulation::{Run as SimulationRun};
18use crate::simulation::simulation::Simulation;
19use crate::simulation::observable::*;
20use crate::simulation::observable::observer::*;
21use crate::simulation::observable::source::*;
22use crate::simulation::observable::disposable::*;
23use crate::simulation::ref_comp::RefComp;
24
25use dvcompute_utils::grc::Grc;
26
27/// Random processes.
28pub mod random;
29
30/// Additional operations.
31pub mod ops;
32
33/// Return a new `Process` computation by the specified pure value.
34#[inline]
35pub fn return_process<T>(val: T) -> Return<T> {
36    Return { val: val }
37}
38
39/// Return a `Process` computation that panics with the specified error message.
40#[inline]
41pub fn panic_process<T>(msg: String) -> Panic<T> {
42    Panic { msg: msg, _phantom: PhantomData }
43}
44
45/// Delay the `Process` computation.
46#[inline]
47pub fn delay_process<F, M>(f: F) -> Delay<F, M>
48    where F: FnOnce() -> M + Clone + 'static,
49          M: Process + 'static
50{
51    Delay { f: f, _phantom: PhantomData }
52}
53
54/// Return the corresponding process identifier.
55#[inline]
56pub fn process_id() -> Id {
57    Id {}
58}
59
60/// Cancel the current process.
61#[inline]
62pub fn cancel_process<T>() -> Cancel<T> {
63    Cancel { _phantom: PhantomData }
64}
65
66/// Hold the process for the specified time interval.
67#[inline]
68pub fn hold_process(dt: f64) -> Hold {
69     Hold { dt: dt }
70}
71
72/// Passivate the process.
73#[inline]
74pub fn passivate_process() -> Passivate {
75    Passivate {}
76}
77
78/// Passivate the process before performing some action.
79#[inline]
80pub fn passivate_process_before<M>(comp: M) -> PassivateBefore<M>
81    where M: Event<Item = ()>
82{
83    PassivateBefore { comp: comp }
84}
85
86/// Execute the `Process` computation in loop.
87#[inline]
88pub fn loop_process<F, M>(f: F) -> Loop<F, M>
89    where F: Fn() -> M + Clone + 'static,
90          M: Process<Item = ()> + 'static
91{
92    Loop { f: f, _phantom: PhantomData }
93}
94
95/// Spawn the child process. In case of cancelling one of the processes,
96/// other process will be cancelled too.
97#[inline]
98pub fn spawn_process<M>(comp: M) -> Spawn<M>
99    where M: Process<Item = ()> + Clone + 'static
100{
101    spawn_process_with(ProcessCancellation::CancelTogether, comp)
102}
103
104/// Spawn the child process specifying how the child and parent processes
105/// should be cancelled in case of need.
106#[inline]
107pub fn spawn_process_using_id<M>(pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
108    where M: Process<Item = ()> + Clone + 'static
109{
110    spawn_process_using_id_with(ProcessCancellation::CancelTogether, pid, comp)
111}
112
113/// Spawn the child process specifying how the child and parent processes
114/// should be cancelled in case of need.
115#[inline]
116pub fn spawn_process_with<M>(cancellation: ProcessCancellation, comp: M) -> Spawn<M>
117    where M: Process<Item = ()> + Clone + 'static
118{
119    Spawn { cancellation: cancellation, comp: comp }
120}
121
122/// Spawn the child process specifying how the child and parent processes
123/// should be cancelled in case of need.
124#[inline]
125pub fn spawn_process_using_id_with<M>(cancellation: ProcessCancellation, pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
126    where M: Process<Item = ()> + Clone + 'static
127{
128    SpawnUsingId { cancellation: cancellation, comp: comp, comp_id: pid }
129}
130
131/// Await a signal that should be emitted by the specified observable.
132#[inline]
133pub fn process_await<O>(observable: O) -> Await<O, O::Message>
134    where O: Observable,
135          O::Message: Clone + 'static
136{
137    Await { observable: observable, _phantom: PhantomData }
138}
139
140/// Await a cancellable signal that should be emitted by the specified observable.
141#[inline]
142pub fn process_await_result<O, M>(observable: O) -> AwaitResult<O, M>
143    where O: Observable<Message = simulation::Result<M>>,
144          M: Clone + 'static
145{
146    AwaitResult { observable: observable, _phantom: PhantomData }
147}
148
149/// Try to run the child process within the specified timeout.
150#[inline]
151pub fn timeout_process<M>(timeout: f64, comp: M) -> Timeout<M>
152    where M: Process + Clone + 'static,
153          M::Item: Clone
154{
155    Timeout { timeout: timeout, comp: comp }
156}
157
158/// Try to run the child process within the specified timeout by using the given identifier.
159#[inline]
160pub fn timeout_process_using_id<M>(timeout: f64, pid: Grc<ProcessId>, comp: M) -> TimeoutUsingId<M>
161    where M: Process + Clone + 'static,
162          M::Item: Clone
163{
164    TimeoutUsingId { timeout: timeout, comp: comp, comp_id: pid }
165}
166
167/// Like the GoTo statement it transfers the direction of computation.
168#[inline]
169pub fn transfer_process<T, M>(comp: M) -> Transfer<T, M>
170    where M: Process<Item = ()>
171{
172    Transfer { comp: comp, _phantom: PhantomData }
173}
174
175/// A process that never returns a result.
176#[inline]
177pub fn never_process<T>() -> Never<T> {
178    Never { _phantom: PhantomData }
179}
180
181/// Register a handler that will be invoked in case of cancelling the current process.
182#[inline]
183pub fn when_cancelling_process<M>(comp: M) -> WhenCancelling<M>
184    where M: Observer<Message = (), Item = ()> + Clone + 'static
185{
186    WhenCancelling { comp: comp }
187}
188
189/// Embeds the result value within the current process, which may cancel the process, for example.
190#[inline]
191pub fn embed_result_process<T>(val: simulation::Result<T>) -> EmbedResult<T> {
192    EmbedResult { val: val }
193}
194
195/// Create a sequence of computations.
196#[inline]
197pub fn process_sequence<M>(comps: List<M>) -> Sequence<M>
198    where M: Process + Clone + 'static,
199          M::Item: Clone
200{
201    Sequence { comps: comps, acc: List::Nil }
202}
203
204/// Create a sequence of computations, where the result is ignored.
205#[inline]
206pub fn process_sequence_<M>(comps: List<M>) -> Sequence_<M>
207    where M: Process + Clone + 'static,
208          M::Item: Clone
209{
210    Sequence_ { comps: comps }
211}
212
213/// Proceed with the process that would use the specified time point priority.
214#[inline]
215pub fn process_with_priority(priority: isize) -> ProcessWithPriority {
216    ProcessWithPriority { priority: priority }
217}
218
219/// Wrap the computation so that it would restore its priority.
220#[inline]
221pub fn restore_process_priority<M>(priority: isize, comp: M) -> impl Process<Item = M::Item>
222    where M: Process + Clone + 'static
223{
224    process_with_priority(priority).and_then(|()| { comp })
225}
226
227/// Trace the computation.
228#[inline]
229pub fn trace_process<M>(msg: String, comp: M) -> Trace<M>
230    where M: Process
231{
232    Trace { comp: comp, msg: msg}
233}
234
235/// Check whether the `Process` computation should be cancelled.
236#[doc(hidden)]
237#[inline]
238pub fn is_process_cancelled(pid: &ProcessId, p: &Point) -> bool {
239    pid.is_cancel_activated(p)
240}
241
242/// Revoke the `Process` computation.
243#[doc(hidden)]
244pub fn revoke_process<T, C>(cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
245    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone
246{
247    pid.deactivate_cancel(p);
248    cont(Result::Err(Error::Cancel), pid, p)
249}
250
251/// Revoke the `Process` computation using the boxed continuation.
252#[doc(hidden)]
253pub fn revoke_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
254    pid.deactivate_cancel(p);
255    cont.call_box((Result::Err(Error::Cancel), pid, p))
256}
257
258/// Cut the error `Process` computation.
259#[doc(hidden)]
260pub fn cut_error_process<T, C>(_cont: C, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()>
261    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
262{
263    Result::Err(Error::Other(err))
264}
265
266/// Cut the error `Process` computation by using the boxed continuation.
267#[doc(hidden)]
268pub fn cut_error_process_boxed<T>(_cont: ProcessBoxCont<T>, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()> {
269    Result::Err(Error::Other(err))
270}
271
272/// Propagate the error within `Process` computation.
273#[doc(hidden)]
274pub fn propagate_error_process<T, C>(cont: C, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()>
275    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
276{
277    cont(Result::Err(Error::Other(err)), pid, p)
278}
279
280/// Propagate the error within `Process` computation.
281#[doc(hidden)]
282pub fn propagate_error_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()> {
283    cont.call_box((Result::Err(Error::Other(err)), pid, p))
284}
285
286/// Resume the `Process` computation.
287#[doc(hidden)]
288#[inline]
289pub fn resume_process<T, C>(cont: C, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()>
290    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone
291{
292    if is_process_cancelled(&pid, p) {
293        revoke_process(cont, pid, p)
294    } else {
295        cont(Result::Ok(t), pid, p)
296    }
297}
298
299/// Resume the `Process` computation using the boxed continuation.
300#[doc(hidden)]
301#[inline]
302pub fn resume_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()> {
303    if is_process_cancelled(&pid, p) {
304        revoke_process_boxed(cont, pid, p)
305    } else {
306        cont.call_box((Result::Ok(t), pid, p))
307    }
308}
309
310/// It defines how the parent and child computations should be cancelled.
311#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
312pub enum ProcessCancellation {
313
314    /// Cancel the both computations together.
315    CancelTogether,
316
317    /// Cancel the child if its parent is cancelled.
318    CancelChildAfterParent,
319
320    /// Cancel the parent if its child is cancelled.
321    CancelParentAfterChild,
322
323    /// Cancel the computations in isolation.
324    CancelInIsolation
325}
326
327/// The computation identifier.
328pub struct ProcessId {
329
330    /// Whether the cancellation has been initiated.
331    cancel_initiated: RefComp<bool>,
332
333    /// Whether the cancellation has been activated.
334    cancel_activated: RefComp<bool>,
335
336    /// The count of preemption requests.
337    preemption_count: RefComp<isize>,
338
339    /// The source of events.
340    source: ObservableSource<ProcessEvent>,
341
342    /// Whether the process is already started.
343    started: RefComp<bool>,
344
345    /// Whether the process was passivated.
346    react_flag: RefComp<Option<bool>>,
347
348    /// The reactivation continuation.
349    react_cont: RefComp<Option<ProcessBoxCont<()>>>,
350
351    /// The priority that must be applied in case of reactivation.
352    react_priority: RefComp<isize>,
353
354    /// Whether the process was interrupted prematurely.
355    interrupt_flag: RefComp<Option<bool>>,
356
357    /// The interruption continuation.
358    interrupt_cont: RefComp<Option<ProcessBoxCont<()>>>,
359
360    /// The time of interruption.
361    interrupt_time: RefComp<f64>,
362
363    /// The version of interruption.
364    interrupt_ver: RefComp<i64>,
365
366    /// The priority that must be applied in case of interruption.
367    interrupt_priority: RefComp<isize>
368}
369
370impl ProcessId {
371
372    /// Create a new computation identifier.
373    pub fn new() -> NewProcessId {
374        NewProcessId {}
375    }
376
377    /// Prepare the process identifier.
378    fn prepare(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
379        let y = pid.started.read_at(p);
380        if y {
381            panic!("Another process with the specified identifier has been started already")
382        } else {
383            pid.started.write_at(true, p)
384        }
385        let disposable = {
386            let observable = pid.observable();
387            let pid = Grc::downgrade(&pid);
388            observable
389                .subscribe(cons_observer(move |e, p| {
390                    match pid.upgrade() {
391                        None => {
392                            let msg = String::from("It should not happen");
393                            let err = Error::retry(msg);
394                            Result::Err(err)
395                        },
396                        Some(pid) => {
397                            match e {
398                                ProcessEvent::CancelInitiating => {
399                                    if pid.is_cancel_activated(p) {
400                                        match ProcessId::interrupt(pid.clone()).call_event(p) {
401                                            Result::Ok(()) => ProcessId::reactivate(pid).call_event(p),
402                                            Result::Err(e) => Result::Err(e)
403                                        }
404                                    } else {
405                                        Result::Ok(())
406                                    }
407                                },
408                                ProcessEvent::PreemptionInitiating => {
409                                    ProcessId::on_preempted_at(pid, p)
410                                },
411                                ProcessEvent::PreemptionEnding => {
412                                    Result::Ok(())
413                                }
414                            }
415                        }
416                    }
417                })).call_event(p)
418        };
419        match disposable {
420            Result::Ok(_) => Result::Ok(()),
421            Result::Err(e) => Result::Err(e)
422        }
423    }
424
425    /// Returns the observable of computation events.
426    #[inline]
427    pub fn observable(&self) -> Publish<ProcessEvent> {
428        self.source.publish()
429    }
430
431    /// Signal when the cancellation is initiated.
432    #[inline]
433    pub fn cancel_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
434        self.observable().filter(|m| { *m == ProcessEvent::CancelInitiating })
435    }
436
437    /// Whether the cancellation was initiated.
438    #[doc(hidden)]
439    #[inline]
440    pub fn is_cancel_initiated_at(&self, p: &Point) -> bool {
441        self.cancel_initiated.read_at(p)
442    }
443
444    /// Whether the cancellation was initiated.
445    #[inline]
446    pub fn is_cancel_initiated(pid: Grc<ProcessId>) -> impl Event<Item = bool> + Clone {
447        cons_event(move |p| Result::Ok(pid.is_cancel_initiated_at(p)))
448    }
449
450    /// Whether the cancellation was activated.
451    #[inline]
452    fn is_cancel_activated(&self, p: &Point) -> bool {
453        self.cancel_activated.read_at(p)
454    }
455
456    /// Deactivate the cancellation.
457    #[inline]
458    fn deactivate_cancel(&self, p: &Point) {
459        self.cancel_activated.write_at(false, p);
460    }
461
462    /// Cancel a process with the specified identifier.
463    #[inline]
464    pub fn cancel(pid: Grc<ProcessId>) -> CancelById {
465        CancelById { pid: pid }
466    }
467
468    /// If the main computation is cancelled then all nested ones will be cancelled too.
469    fn _bind_cancel(this: Grc<ProcessId>, others: Vec<Grc<ProcessId>>, p: &Point) -> simulation::Result<DisposableBox> {
470        let this2 = this.clone();
471        let this1 = this;
472        let others2 = {
473            others.iter().map(|pid| pid.clone()).collect::<Vec<_>>()
474        };
475        let others1 = others;
476        let hs1 = {
477            others1.iter().map(|pid| {
478                let this1 = Grc::downgrade(&this1);
479                pid.cancel_initiating()
480                    .subscribe(cons_observer(move |_, p| {
481                        match this1.upgrade() {
482                            None => Result::Ok(()),
483                            Some(this1) => this1.initiate_cancel_at(p)
484                        }
485                    }))
486            })
487        };
488        let hs1 = event_sequence(hs1).call_event(p);
489        match hs1 {
490            Result::Err(e) => Result::Err(e),
491            Result::Ok(hs1) => {
492                let hs2 = {
493                    others2.iter().map(|pid| {
494                        let pid = Grc::downgrade(&pid);
495                        this2.cancel_initiating()
496                            .subscribe(cons_observer(move |_, p| {
497                                match pid.upgrade() {
498                                    None => Result::Ok(()),
499                                    Some(pid) => pid.initiate_cancel_at(p)
500                                }
501                            }))
502                    })
503                };
504                let hs2 = event_sequence(hs2).call_event(p);
505                match hs2 {
506                    Result::Err(e) => Result::Err(e),
507                    Result::Ok(hs2) => {
508                        let hs1 = concat_disposables(hs1.into_iter());
509                        let hs2 = concat_disposables(hs2.into_iter());
510
511                        Result::Ok(hs1.merge(hs2).into_boxed())
512                    }
513                }
514            }
515        }
516    }
517
518    /// Connect the parent computation to the child one.
519    fn connect_cancel(parent: Grc<ProcessId>, cancellation: ProcessCancellation, child: Grc<ProcessId>, p: &Point) -> simulation::Result<DisposableBox> {
520        let h1 = match cancellation {
521            ProcessCancellation::CancelTogether |
522            ProcessCancellation::CancelChildAfterParent => {
523                let child = Grc::downgrade(&child);
524                parent.cancel_initiating()
525                    .subscribe(cons_observer(move |_, p| {
526                        match child.upgrade() {
527                            None => Result::Ok(()),
528                            Some(child) => child.initiate_cancel_at(p)
529                        }
530                    }))
531                    .call_event(p)
532            },
533            ProcessCancellation::CancelParentAfterChild |
534            ProcessCancellation::CancelInIsolation => {
535                Result::Ok(empty_disposable().into_boxed())
536            }
537        };
538        match h1 {
539            Result::Err(e) => Result::Err(e),
540            Result::Ok(h1) => {
541                let h2 = match cancellation {
542                    ProcessCancellation::CancelTogether |
543                    ProcessCancellation::CancelParentAfterChild => {
544                        let parent = Grc::downgrade(&parent);
545                        child.cancel_initiating()
546                            .subscribe(cons_observer(move |_, p| {
547                                match parent.upgrade() {
548                                    None => Result::Ok(()),
549                                    Some(parent) => parent.initiate_cancel_at(p)
550                                }
551                            }))
552                            .call_event(p)
553                    },
554                    ProcessCancellation::CancelChildAfterParent |
555                    ProcessCancellation::CancelInIsolation => {
556                        Result::Ok(empty_disposable().into_boxed())
557                    }
558                };
559                match h2 {
560                    Result::Err(e) => {
561                        h1.call_box((p,))?;
562                        Result::Err(e)
563                    },
564                    Result::Ok(h2) => Result::Ok(h1.merge(h2).into_boxed())
565                }
566            }
567        }
568    }
569
570    /// Initiate the cancellation.
571    #[doc(hidden)]
572    pub fn initiate_cancel_at(&self, p: &Point) -> simulation::Result<()> {
573        let f = self.cancel_initiated.read_at(p);
574        if !f {
575            self.cancel_initiated.write_at(true, p);
576            self.cancel_activated.write_at(true, p);
577            self.source.trigger_at(&ProcessEvent::CancelInitiating, p)
578        } else {
579            Result::Ok(())
580        }
581    }
582
583    /// Preempt the computation.
584    #[doc(hidden)]
585    pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
586        let f = self.cancel_initiated.read_at(p);
587        if !f {
588            let n = self.preemption_count.read_at(p);
589            self.preemption_count.write_at(n + 1, p);
590            if n == 0 {
591                self.source.trigger_at(&ProcessEvent::PreemptionInitiating, p)
592            } else {
593                Result::Ok(())
594            }
595        } else {
596            Result::Ok(())
597        }
598    }
599
600    /// Proceed with the computation after it was preempted earlier.
601    #[doc(hidden)]
602    pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
603        let f = self.cancel_initiated.read_at(p);
604        if !f {
605            let n = self.preemption_count.read_at(p);
606            self.preemption_count.write_at(n - 1, p);
607            if n - 1 == 0 {
608                self.source.trigger_at(&ProcessEvent::PreemptionEnding, p)
609            } else {
610                Result::Ok(())
611            }
612        } else {
613            Result::Ok(())
614        }
615    }
616
617    /// Preempt the computation.
618    #[inline]
619    pub fn begin_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
620        cons_event(move |p| pid.begin_preemption_at(p))
621    }
622
623    /// Proceed with the computation after it was preempted earlier.
624    #[inline]
625    pub fn end_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
626        cons_event(move |p| pid.end_preemption_at(p))
627    }
628
629    /// Signal when the computation is preempted.
630    #[inline]
631    pub fn preemption_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
632        self.observable().filter(|m| { *m == ProcessEvent::PreemptionInitiating })
633    }
634
635    /// Signal when the computation is proceeded after it was preempted.
636    #[inline]
637    pub fn preemption_ending(&self) -> impl Observable<Message = ProcessEvent> + Clone {
638        self.observable().filter(|m| { *m == ProcessEvent::PreemptionEnding })
639    }
640
641    /// Whether the computation was preempted.
642    #[inline]
643    fn is_preempted(&self, p: &Point) -> bool {
644        self.preemption_count.read_at(p) > 0
645    }
646
647    /// Interrupt a process with the specified identifier if the process
648    /// is held by computation `hold_process`. Returns an `Event` computation.
649    #[inline]
650    pub fn interrupt(pid: Grc<ProcessId>) -> Interrupt {
651        Interrupt { pid: pid }
652    }
653
654    /// Test whether the process with the specified identifier was interrupted.
655    /// Returns an `Event` computation.
656    #[inline]
657    pub fn is_interrupted(pid: Grc<ProcessId>) -> IsInterrupted {
658        IsInterrupted { pid: pid }
659    }
660
661    /// Return the expected interruption time after finishing the `hold_process` computation,
662    /// which value may change if the corresponding process is preempted.
663    /// Returns an `Event` computation.
664    #[inline]
665    pub fn interruption_time(pid: Grc<ProcessId>) -> InterruptionTime {
666        InterruptionTime { pid: pid }
667    }
668
669    /// Test whether the process with the specified identifier was passivated.
670    /// Returns an `Event` computation.
671    #[inline]
672    pub fn is_passivated(pid: Grc<ProcessId>) -> IsPassivated {
673        IsPassivated { pid: pid }
674    }
675
676    /// Reactivate a process with the specified identifier.
677    #[inline]
678    pub fn reactivate(pid: Grc<ProcessId>) -> Reactivate {
679        Reactivate { pid: pid }
680    }
681
682    /// Reactivate immediately a process with the specified identifier.
683    #[inline]
684    pub fn reactivate_immediately(pid: Grc<ProcessId>) -> ReactivateImmediately {
685        ReactivateImmediately { pid: pid }
686    }
687
688    /// Reactivate immediately the processes with specfified identifies.
689    #[inline]
690    pub fn reactivate_many_immediately(pids: List<Grc<ProcessId>>) -> ReactivateManyImmediately {
691        ReactivateManyImmediately { pids: pids }
692    }
693
694    /// Define a reaction when the process with the specified identifier is preempted.
695    fn on_preempted_at(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
696        match pid.interrupt_cont.swap_at(None, p) {
697            Some(cont) => {
698                let t  = pid.interrupt_time.read_at(p);
699                let dt = t - p.time;
700                let v  = pid.interrupt_ver.read_at(p);
701                let priority = pid.interrupt_priority.read_at(p);
702                pid.interrupt_flag.write_at(Some(true), p);
703                pid.interrupt_ver.write_at(v.wrapping_add(1), p);
704                let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
705                    restore_process_priority(priority, hold_process(dt))
706                        .call_process_boxed(cont, pid, p)
707                });
708                reenter_process(cont, pid, (), p)
709            },
710            None => {
711                match pid.react_cont.swap_at(None, p) {
712                    None => Result::Ok(()),
713                    Some(cont) => {
714                        let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
715                            reenter_process(cont, pid, (), p)
716                        });
717                        pid.react_cont.write_at(Some(cont), p);
718                        Result::Ok(())
719                    }
720                }
721            }
722        }
723    }
724}
725
726impl PartialEq for ProcessId {
727
728    fn eq(&self, other: &Self) -> bool {
729        self.started == other.started
730    }
731}
732
733impl Eq for ProcessId {}
734
735/// A computation that creates a `ProcessId` value.
736#[must_use = "computations are lazy and do nothing unless to be run"]
737#[derive(Clone)]
738pub struct NewProcessId {}
739
740impl Simulation for NewProcessId {
741
742    type Item = ProcessId;
743
744    #[doc(hidden)]
745    fn call_simulation(self, r: &SimulationRun) -> simulation::Result<Self::Item> {
746        let specs = &r.specs;
747        Result::Ok(ProcessId {
748            cancel_initiated: RefComp::new(false),
749            cancel_activated: RefComp::new(false),
750            preemption_count: RefComp::new(0),
751            source: ObservableSource::new(),
752            started: RefComp::new(false),
753            react_flag: RefComp::new(None),
754            react_cont: RefComp::new(None),
755            react_priority: RefComp::new(0),
756            interrupt_flag: RefComp::new(None),
757            interrupt_cont: RefComp::new(None),
758            interrupt_time: RefComp::new(specs.start_time),
759            interrupt_ver: RefComp::new(0),
760            interrupt_priority: RefComp::new(0)
761        })
762    }
763}
764
765/// The event that occurs whithin the `Process` computation.
766#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
767pub enum ProcessEvent {
768
769    /// Cancel the computation.
770    CancelInitiating,
771
772    /// Preempt the computation.
773    PreemptionInitiating,
774
775    /// Proceed with the computation after it was preempted.
776    PreemptionEnding
777}
778
779/// The computation based on continuations.
780pub trait Process {
781
782    /// The type of the item that is used within the computation.
783    type Item;
784
785    /// Call the `Process` computation.
786    #[doc(hidden)]
787    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
788        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static;
789
790    /// Call the `Process` computation using the boxed continuation.
791    #[doc(hidden)]
792    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>;
793
794    /// Bind the current computation with its continuation within the resulting computation.
795    #[inline]
796    fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
797        where Self: Sized + 'static,
798              U: Process + 'static,
799              F: FnOnce(Self::Item) -> U + Clone + 'static,
800    {
801        AndThen { comp: self, f: f, _phantom: PhantomData }
802    }
803
804    /// Map the current computation using the specified transform.
805    #[inline]
806    fn map<B, F>(self, f: F) -> Map<Self, B, F>
807        where Self: Sized + 'static,
808              F: FnOnce(Self::Item) -> B + Clone + 'static,
809    {
810        Map { comp: self, f: f, _phantom: PhantomData }
811    }
812
813    /// Zip the current computation with another one within the resulting computation.
814    #[inline]
815    fn zip<U>(self, other: U) -> Zip<Self, U>
816        where Self: Sized + 'static,
817              Self::Item: Clone,
818              U: Process + Clone + 'static
819    {
820        Zip { comp: self, other: other }
821    }
822
823    /// The function application.
824    #[inline]
825    fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
826        where Self: Sized + 'static,
827              Self::Item: FnOnce(U::Item) -> B + Clone,
828              U: Process + Clone + 'static,
829              B: 'static
830    {
831        Ap { comp: self, other: other, _phantom: PhantomData }
832    }
833
834    /// Finalize the current computation regardless of canceling it or not.
835    #[inline]
836    fn finally<U>(self, finalization: U) -> Finally<Self, U>
837        where Self: Sized + 'static,
838              Self::Item: Clone,
839              U: Process<Item = ()> + Clone + 'static
840    {
841        Finally { comp: self, finalization: finalization }
842    }
843
844    /// Run the `Process` computation.
845    #[inline]
846    fn run(self) -> Run<Self>
847        where Self: Process<Item = ()> + Sized
848    {
849        Run { comp: self, pid: None }
850    }
851
852    /// Run the `Process` computation using the specified process identifier.
853    #[inline]
854    fn run_using_id(self, pid: Grc<ProcessId>) -> Run<Self>
855        where Self: Process<Item = ()> + Sized
856    {
857        Run { comp: self, pid: Some(pid) }
858    }
859
860    /// Convert into a boxed value.
861    #[inline]
862    fn into_boxed(self) -> ProcessBox<Self::Item>
863        where Self: Sized + Clone + 'static
864    {
865        ProcessBox::new(move |cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point| {
866            self.call_process_boxed(cont, pid, p)
867        })
868    }
869}
870
871/// Allows converting to `Process` computations.
872pub trait IntoProcess {
873
874    /// The target computation.
875    type Process: Process<Item = Self::Item>;
876
877    /// The type of item that is returned by the computation.
878    type Item;
879
880    /// Convert to the `Process` computation.
881    fn into_process(self) -> Self::Process;
882}
883
884impl<M: Process> IntoProcess for M {
885
886    type Process = M;
887
888    type Item = M::Item;
889
890    #[inline]
891    fn into_process(self) -> Self::Process {
892        self
893    }
894}
895
896/// The continuation of the boxed `Process` computation.
897#[doc(hidden)]
898pub struct ProcessBoxCont<T> {
899    f: Box<dyn ProcessContFnBoxClone<T>>
900}
901
902impl<T> ProcessBoxCont<T> {
903
904    /// Create a new continuation box.
905    #[inline]
906    pub fn new<C>(cont: C) -> Self
907        where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
908    {
909        ProcessBoxCont {
910            f: Box::new(cont)
911        }
912    }
913
914    /// Call the boxed function.
915    #[inline]
916    pub fn call_box(self, arg: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
917        let ProcessBoxCont { f } = self;
918        f.call_box(arg)
919    }
920}
921
922impl<T> Clone for ProcessBoxCont<T> {
923
924    #[inline]
925    fn clone(&self) -> Self {
926        ProcessBoxCont {
927            f: self.f.call_clone()
928        }
929    }
930}
931
932/// A trait to support the stable version of Rust, where there is no `FnBox`.
933trait ProcessContFnBox<T> {
934
935    /// Call the corresponding function.
936    fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
937}
938
939impl<T, F> ProcessContFnBox<T> for F
940    where F: for<'a> FnOnce(simulation::Result<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
941{
942    fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
943        let this: Self = *self;
944        this(args.0, args.1, args.2)
945    }
946}
947
948/// A trait to implement a cloneable `FnBox`.
949trait ProcessContFnBoxClone<T>: ProcessContFnBox<T> {
950
951    /// Clone the function.
952    fn call_clone(&self) -> Box<dyn ProcessContFnBoxClone<T>>;
953}
954
955impl<T, F> ProcessContFnBoxClone<T> for F
956    where F: for<'a> FnOnce(simulation::Result<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()> + Clone + 'static
957{
958    fn call_clone(&self) -> Box<dyn ProcessContFnBoxClone<T>> {
959        Box::new(self.clone())
960    }
961}
962
963/// The `Process` computation box.
964#[must_use = "computations are lazy and do nothing unless to be run"]
965pub struct ProcessBox<T> {
966    f: Box<dyn ProcessFnBoxClone<T>>
967}
968
969#[doc(hidden)]
970impl<T> ProcessBox<T> {
971
972    #[inline]
973    pub fn new<F>(f: F) -> Self
974        where F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
975    {
976        ProcessBox {
977            f: Box::new(f)
978        }
979    }
980
981    #[inline]
982    pub fn call_box(self, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
983        let ProcessBox { f } = self;
984        f.call_box(args)
985    }
986}
987
988impl<T> Clone for ProcessBox<T> {
989
990    #[inline]
991    fn clone(&self) -> Self {
992        ProcessBox {
993            f: self.f.call_clone()
994        }
995    }
996}
997
998impl<T> Process for ProcessBox<T> {
999
1000    type Item = T;
1001
1002    #[doc(hidden)]
1003    #[inline]
1004    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1005        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1006    {
1007        let cont = ProcessBoxCont::new(cont);
1008        self.call_box((cont, pid, p))
1009    }
1010
1011    #[doc(hidden)]
1012    #[inline]
1013    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1014        self.call_box((cont, pid, p))
1015    }
1016
1017    #[inline]
1018    fn into_boxed(self) -> ProcessBox<Self::Item>
1019        where Self: Sized + 'static
1020    {
1021        self
1022    }
1023}
1024
1025/// A trait to support the stable version of Rust, where there is no `FnBox`.
1026trait ProcessFnBox<T> {
1027
1028    /// Call the corresponding function.
1029    fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
1030}
1031
1032impl<T, F> ProcessFnBox<T> for F
1033    where F: for<'a> FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
1034{
1035    fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
1036        let this: Self = *self;
1037        this(args.0, args.1, args.2)
1038    }
1039}
1040
1041/// A trait to implement a cloneable `FnBox`.
1042trait ProcessFnBoxClone<T>: ProcessFnBox<T> {
1043
1044    /// Clone the function.
1045    fn call_clone(&self) -> Box<dyn ProcessFnBoxClone<T>>;
1046}
1047
1048impl<T, F> ProcessFnBoxClone<T> for F
1049    where F: for<'a> FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()> + Clone + 'static
1050{
1051    fn call_clone(&self) -> Box<dyn ProcessFnBoxClone<T>> {
1052        Box::new(self.clone())
1053    }
1054}
1055
1056/// Allows creating the `Process` computation from a pure value.
1057#[must_use = "computations are lazy and do nothing unless to be run"]
1058#[derive(Clone)]
1059pub struct Return<T> {
1060
1061    /// Return a pure value, which is then transformed to the computation.
1062    val: T
1063}
1064
1065impl<T> Process for Return<T> {
1066
1067    type Item = T;
1068
1069    #[doc(hidden)]
1070    #[inline]
1071    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1072        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1073    {
1074        if is_process_cancelled(&pid, p) {
1075            revoke_process(cont, pid, p)
1076        } else {
1077            let Return { val } = self;
1078            cont(Result::Ok(val), pid, p)
1079        }
1080    }
1081
1082    #[doc(hidden)]
1083    #[inline]
1084    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1085        if is_process_cancelled(&pid, p) {
1086            revoke_process_boxed(cont, pid, p)
1087        } else {
1088            let Return { val } = self;
1089            cont.call_box((Result::Ok(val), pid, p))
1090        }
1091    }
1092}
1093
1094/// Allows delaying the `Process` computation by the specified function.
1095#[must_use = "computations are lazy and do nothing unless to be run"]
1096#[derive(Clone)]
1097pub struct Delay<F, M> {
1098
1099    /// Return the computation.
1100    f: F,
1101
1102    /// To keep the type parameter.
1103    _phantom: PhantomData<M>
1104}
1105
1106impl<F, M> Process for Delay<F, M>
1107    where F: FnOnce() -> M + Clone + 'static,
1108          M: Process + 'static
1109{
1110    type Item = M::Item;
1111
1112    #[doc(hidden)]
1113    #[inline]
1114    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1115        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1116    {
1117        let Delay { f, _phantom } = self;
1118        return_process(()).and_then(move |_| {
1119            f()
1120        }).call_process(cont, pid, p)
1121    }
1122
1123    #[doc(hidden)]
1124    #[inline]
1125    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1126        let Delay { f, _phantom } = self;
1127        return_process(()).and_then(move |_| {
1128            f()
1129        }).call_process_boxed(cont, pid, p)
1130    }
1131}
1132
1133/// The monadic bind for the `Process` computation.
1134#[must_use = "computations are lazy and do nothing unless to be run"]
1135#[derive(Clone)]
1136pub struct AndThen<M, U, F> {
1137
1138    /// The current computation.
1139    comp: M,
1140
1141    /// The continuation of the current computation.
1142    f: F,
1143
1144    /// To keep the type parameter.
1145    _phantom: PhantomData<U>
1146}
1147
1148impl<M, U, F> Process for AndThen<M, U, F>
1149    where F: FnOnce(M::Item) -> U + Clone + 'static,
1150          M: Process + 'static,
1151          U: Process + 'static
1152{
1153    type Item = U::Item;
1154
1155    #[doc(hidden)]
1156    #[inline]
1157    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1158        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1159    {
1160        // m >>= k = \c -> m (\a -> k a c)
1161
1162        if is_process_cancelled(&pid, p) {
1163            revoke_process(cont, pid, p)
1164        } else {
1165            let AndThen { comp, f, _phantom } = self;
1166            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1167                match a {
1168                    Result::Ok(a) => {
1169                        f(a).call_process(cont, pid, p)
1170                    },
1171                    Result::Err(Error::Cancel) => {
1172                        revoke_process(cont, pid, p)
1173                    },
1174                    Result::Err(Error::Other(e)) => {
1175                        match e.deref() {
1176                            &OtherError::Retry(_) => {
1177                                cut_error_process(cont, pid, e, p)
1178                            },
1179                            &OtherError::Panic(_) => {
1180                                cut_error_process(cont, pid, e, p)
1181                            },
1182                            &OtherError::IO(_) => {
1183                                propagate_error_process(cont, pid, e, p)
1184                            }
1185                        }
1186                    }
1187                }
1188            };
1189            comp.call_process(cont, pid, p)
1190        }
1191    }
1192
1193    #[doc(hidden)]
1194    #[inline]
1195    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1196        // m >>= k = \c -> m (\a -> k a c)
1197
1198        if is_process_cancelled(&pid, p) {
1199            revoke_process_boxed(cont, pid, p)
1200        } else {
1201            let AndThen { comp, f, _phantom } = self;
1202            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1203                match a {
1204                    Result::Ok(a) => {
1205                        f(a).call_process_boxed(cont, pid, p)
1206                    },
1207                    Result::Err(Error::Cancel) => {
1208                        revoke_process_boxed(cont, pid, p)
1209                    },
1210                    Result::Err(Error::Other(e)) => {
1211                        match e.deref() {
1212                            &OtherError::Retry(_) => {
1213                                cut_error_process_boxed(cont, pid, e, p)
1214                            },
1215                            &OtherError::Panic(_) => {
1216                                cut_error_process_boxed(cont, pid, e, p)
1217                            },
1218                            &OtherError::IO(_) => {
1219                                propagate_error_process_boxed(cont, pid, e, p)
1220                            }
1221                        }
1222                    }
1223                }
1224            };
1225            comp.call_process(cont, pid, p)
1226        }
1227    }
1228}
1229
1230/// The finally block for the `Process` computation.
1231#[must_use = "computations are lazy and do nothing unless to be run"]
1232#[derive(Clone)]
1233pub struct Finally<M, U> {
1234
1235    /// The current computation.
1236    comp: M,
1237
1238    /// The finalization computation.
1239    finalization: U
1240}
1241
1242impl<M, U> Process for Finally<M, U>
1243    where M: Process + 'static,
1244          M::Item: Clone,
1245          U: Process<Item = ()> + Clone + 'static
1246{
1247    type Item = M::Item;
1248
1249    #[doc(hidden)]
1250    #[inline]
1251    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1252        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1253    {
1254        if is_process_cancelled(&pid, p) {
1255            revoke_process(cont, pid, p)
1256        } else {
1257            let Finally { comp, finalization } = self;
1258            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1259                let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1260                    match a0 {
1261                        Result::Ok(()) => cont(a, pid, p),
1262                        Result::Err(e0) => {
1263                            match a {
1264                                Result::Ok(_)  => cont(Result::Err(e0), pid, p),
1265                                Result::Err(e) => cont(Result::Err(e.merge(&e0)), pid, p)
1266                            }
1267                        }
1268                    }
1269                };
1270                finalization.call_process(cont, pid, p)
1271            };
1272            comp.call_process(cont, pid, p)
1273        }
1274    }
1275
1276    #[doc(hidden)]
1277    #[inline]
1278    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1279        if is_process_cancelled(&pid, p) {
1280            revoke_process_boxed(cont, pid, p)
1281        } else {
1282            let Finally { comp, finalization } = self;
1283            let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
1284                let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
1285                    match a0 {
1286                        Result::Ok(()) => cont.call_box((a, pid, p)),
1287                        Result::Err(e0) => {
1288                            match a {
1289                                Result::Ok(_)  => cont.call_box((Result::Err(e0), pid, p)),
1290                                Result::Err(e) => cont.call_box((Result::Err(e.merge(&e0)), pid, p))
1291                            }
1292                        }
1293                    }
1294                };
1295                finalization.call_process(cont, pid, p)
1296            };
1297            comp.call_process(cont, pid, p)
1298        }
1299    }
1300}
1301
1302/// The functor for the `Process` computation.
1303#[must_use = "computations are lazy and do nothing unless to be run"]
1304#[derive(Clone)]
1305pub struct Map<M, B, F> {
1306
1307    /// The current computation.
1308    comp: M,
1309
1310    /// The transform.
1311    f: F,
1312
1313    /// To keep the type parameter.
1314    _phantom: PhantomData<B>
1315}
1316
1317impl<M, B, F> Process for Map<M, B, F>
1318    where F: FnOnce(M::Item) -> B + Clone + 'static,
1319          M: Process + 'static,
1320          B: 'static
1321{
1322    type Item = B;
1323
1324    #[doc(hidden)]
1325    #[inline]
1326    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1327        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1328    {
1329        let Map { comp, f, _phantom } = self;
1330        comp.and_then(move |a| {
1331            return_process(f(a))
1332        }).call_process(cont, pid, p)
1333    }
1334
1335    #[doc(hidden)]
1336    #[inline]
1337    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1338        let Map { comp, f, _phantom } = self;
1339        comp.and_then(move |a| {
1340            return_process(f(a))
1341        }).call_process_boxed(cont, pid, p)
1342    }
1343}
1344
1345/// The zip of two `Process` computations.
1346#[must_use = "computations are lazy and do nothing unless to be run"]
1347#[derive(Clone)]
1348pub struct Zip<M, U> {
1349
1350    /// The current computation.
1351    comp: M,
1352
1353    /// Another computation.
1354    other: U,
1355}
1356
1357impl<M, U> Process for Zip<M, U>
1358    where M: Process + 'static,
1359          M::Item: Clone,
1360          U: Process + Clone + 'static
1361{
1362    type Item = (M::Item, U::Item);
1363
1364    #[doc(hidden)]
1365    #[inline]
1366    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1367        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1368    {
1369        let Zip { comp, other } = self;
1370        comp.and_then(move |a| {
1371            other.and_then(move |b| {
1372                return_process((a, b))
1373            })
1374        }).call_process(cont, pid, p)
1375    }
1376
1377    #[doc(hidden)]
1378    #[inline]
1379    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1380        let Zip { comp, other } = self;
1381        comp.and_then(move |a| {
1382            other.and_then(move |b| {
1383                return_process((a, b))
1384            })
1385        }).call_process_boxed(cont, pid, p)
1386    }
1387}
1388
1389/// The function application for the `Process` computation.
1390#[must_use = "computations are lazy and do nothing unless to be run"]
1391#[derive(Clone)]
1392pub struct Ap<M, U, B> {
1393
1394    /// The current computation.
1395    comp: M,
1396
1397    /// The continuation of the current computation.
1398    other: U,
1399
1400    /// To keep the type parameter.
1401    _phantom: PhantomData<B>
1402}
1403
1404impl<M, U, B> Process for Ap<M, U, B>
1405    where M: Process + 'static,
1406          U: Process + Clone + 'static,
1407          M::Item: FnOnce(U::Item) -> B + Clone,
1408          B: 'static
1409{
1410    type Item = B;
1411
1412    #[doc(hidden)]
1413    #[inline]
1414    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1415        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1416    {
1417        let Ap { comp, other, _phantom } = self;
1418        comp.and_then(move |f| {
1419            other.and_then(move |a| {
1420                return_process(f(a))
1421            })
1422        }).call_process(cont, pid, p)
1423    }
1424
1425    #[doc(hidden)]
1426    #[inline]
1427    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1428        let Ap { comp, other, _phantom } = self;
1429        comp.and_then(move |f| {
1430            other.and_then(move |a| {
1431                return_process(f(a))
1432            })
1433        }).call_process_boxed(cont, pid, p)
1434    }
1435}
1436
1437/// Run the computation.
1438#[must_use = "computations are lazy and do nothing unless to be run"]
1439#[derive(Clone)]
1440pub struct Run<M> {
1441
1442    /// The current computation.
1443    comp: M,
1444
1445    /// The process identifier.
1446    pid: Option<Grc<ProcessId>>
1447}
1448
1449impl<M> Event for Run<M>
1450    where M: Process<Item = ()>
1451{
1452    type Item = M::Item;
1453
1454    #[doc(hidden)]
1455    #[inline]
1456    fn call_event(self, p: &Point) -> simulation::Result<M::Item> {
1457        let Run { comp, pid } = self;
1458        let pid = {
1459            match pid {
1460                Some(pid) => pid,
1461                None => {
1462                    match ProcessId::new().into_event().call_event(p) {
1463                        Result::Ok(pid) => Grc::new(pid),
1464                        Result::Err(e) => return Result::Err(e)
1465                    }
1466                }
1467            }
1468        };
1469        let cont = &initial_cont;
1470        match ProcessId::prepare(pid.clone(), p) {
1471            Result::Err(e) => Result::Err(e),
1472            Result::Ok(()) => comp.call_process(cont, pid, p)
1473        }
1474    }
1475}
1476
1477/// The initial continuation.
1478fn initial_cont(_a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
1479    Result::Ok(())
1480}
1481
1482/// Return the corresponding process identifier.
1483#[must_use = "computations are lazy and do nothing unless to be run"]
1484#[derive(Clone)]
1485pub struct Id {}
1486
1487impl Process for Id {
1488
1489    type Item = Grc<ProcessId>;
1490
1491    #[doc(hidden)]
1492    #[inline]
1493    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1494        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1495    {
1496        cont(Result::Ok(pid.clone()), pid, p)
1497    }
1498
1499    #[doc(hidden)]
1500    #[inline]
1501    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1502        cont.call_box((Result::Ok(pid.clone()), pid, p))
1503    }
1504}
1505
1506/// Cancel a process by the specified process identifier.
1507#[must_use = "computations are lazy and do nothing unless to be run"]
1508#[derive(Clone)]
1509pub struct CancelById {
1510
1511    /// The process identifier.
1512    pid: Grc<ProcessId>
1513}
1514
1515impl Event for CancelById {
1516
1517    type Item = ();
1518
1519    #[doc(hidden)]
1520    #[inline]
1521    fn call_event(self, p: &Point) -> simulation::Result<()> {
1522        let CancelById { pid } = self;
1523        pid.initiate_cancel_at(p)
1524    }
1525}
1526
1527/// Cancel the current process.
1528#[must_use = "computations are lazy and do nothing unless to be run"]
1529#[derive(Clone)]
1530pub struct Cancel<T> {
1531
1532    /// To keep the type parameter.
1533    _phantom: PhantomData<T>
1534}
1535
1536impl<T> Process for Cancel<T> {
1537
1538    type Item = T;
1539
1540    #[doc(hidden)]
1541    #[inline]
1542    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1543        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1544    {
1545        match pid.initiate_cancel_at(p) {
1546            Result::Ok(()) => {
1547                if is_process_cancelled(&pid, p) {
1548                    revoke_process(cont, pid, p)
1549                } else {
1550                    Result::Ok(())
1551                }
1552            },
1553            Result::Err(e) => Result::Err(e)
1554        }
1555    }
1556
1557    #[doc(hidden)]
1558    #[inline]
1559    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1560        match pid.initiate_cancel_at(p) {
1561            Result::Ok(()) => {
1562                if is_process_cancelled(&pid, p) {
1563                    revoke_process_boxed(cont, pid, p)
1564                } else {
1565                    Result::Ok(())
1566                }
1567            },
1568            Result::Err(e) => Result::Err(e)
1569        }
1570    }
1571}
1572
1573/// The hold block for the `Process` computation.
1574#[must_use = "computations are lazy and do nothing unless to be run"]
1575#[derive(Clone)]
1576pub struct Hold {
1577
1578    /// The time interval of holding the process.
1579    dt: f64
1580}
1581
1582impl Process for Hold {
1583
1584    type Item = ();
1585
1586    #[doc(hidden)]
1587    #[inline]
1588    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1589        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1590    {
1591        let cont = ProcessBoxCont::new(cont);
1592        self.call_process_boxed(cont, pid, p)
1593    }
1594
1595    #[doc(hidden)]
1596    #[inline]
1597    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1598        let Hold { dt } = self;
1599        let t = p.time + dt;
1600        pid.interrupt_cont.write_at(Some(cont), p);
1601        pid.interrupt_flag.write_at(Some(false), p);
1602        pid.interrupt_time.write_at(t, p);
1603        pid.interrupt_priority.write_at(p.priority, p);
1604        let v = pid.interrupt_ver.read_at(p);
1605        enqueue_event(t, {
1606            cons_event(move |p| {
1607                if p.priority >= p.minimal_priority {
1608                    let v2 = pid.interrupt_ver.read_at(p);
1609                    if v == v2 {
1610                        pid.interrupt_flag.write_at(None, p);
1611                        let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1612                        resume_process_boxed(cont, pid, (), p)
1613                    } else {
1614                        Result::Ok(())
1615                    }
1616
1617                } else {
1618                    pid.initiate_cancel_at(p)?;
1619                    if is_process_cancelled(&pid, p) {
1620                        let v2 = pid.interrupt_ver.read_at(p);
1621                        if v == v2 {
1622                            pid.interrupt_flag.write_at(None, p);
1623                            let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
1624                            revoke_process_boxed(cont, pid, p)
1625                        } else {
1626                            Result::Ok(())
1627                        }
1628                    } else {
1629                        Result::Ok(())
1630                    }
1631                }
1632            }).into_boxed()
1633        }).call_event(p)
1634    }
1635}
1636
1637/// Interrupt a process with the specified identifier if the process
1638/// is held by computation `hold_process`.
1639#[must_use = "computations are lazy and do nothing unless to be run"]
1640#[derive(Clone)]
1641pub struct Interrupt {
1642
1643    /// The process identifier.
1644    pid: Grc<ProcessId>
1645}
1646
1647impl Event for Interrupt {
1648
1649    type Item = ();
1650
1651    #[doc(hidden)]
1652    #[inline]
1653    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1654        let Interrupt { pid } = self;
1655        match pid.interrupt_cont.swap_at(None, p) {
1656            None => Result::Ok(()),
1657            Some(cont) => {
1658                let v = pid.interrupt_ver.read_at(p);
1659                let priority = pid.interrupt_priority.read_at(p);
1660                pid.interrupt_ver.write_at(v.wrapping_add(1), p);
1661                pid.interrupt_flag.write_at(Some(true), p);
1662                enqueue_event_with_priority(p.time, priority, {
1663                    cons_event(move |p| {
1664                        resume_process_boxed(cont, pid, (), p)
1665                    }).into_boxed()
1666                }).call_event(p)
1667            }
1668        }
1669    }
1670}
1671
1672/// Test whether the process with the specified identifier was interrupted.
1673#[must_use = "computations are lazy and do nothing unless to be run"]
1674#[derive(Clone)]
1675pub struct IsInterrupted {
1676
1677    /// The process identifier.
1678    pid: Grc<ProcessId>
1679}
1680
1681impl Event for IsInterrupted {
1682
1683    type Item = bool;
1684
1685    #[doc(hidden)]
1686    #[inline]
1687    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1688        let IsInterrupted { pid } = self;
1689        match pid.interrupt_flag.read_at(p) {
1690            Some(true) => Result::Ok(true),
1691            _ => Result::Ok(false)
1692        }
1693    }
1694}
1695
1696/// Return the expected interruption time after finishing the `hold_process` computation,
1697/// which value may change if the corresponding process is preempted.
1698#[must_use = "computations are lazy and do nothing unless to be run"]
1699#[derive(Clone)]
1700pub struct InterruptionTime {
1701
1702    /// The process identifier.
1703    pid: Grc<ProcessId>
1704}
1705
1706impl Event for InterruptionTime {
1707
1708    type Item = Option<f64>;
1709
1710    #[doc(hidden)]
1711    #[inline]
1712    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1713        let InterruptionTime { pid } = self;
1714        match pid.interrupt_flag.read_at(p) {
1715            Some(false) => Result::Ok(Some(pid.interrupt_time.read_at(p))),
1716            _ => Result::Ok(None)
1717        }
1718    }
1719}
1720
1721/// Passivate the `Process` computation.
1722#[must_use = "computations are lazy and do nothing unless to be run"]
1723#[derive(Clone)]
1724pub struct Passivate {}
1725
1726impl Process for Passivate {
1727
1728    type Item = ();
1729
1730    #[doc(hidden)]
1731    #[inline]
1732    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1733        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1734    {
1735        let cont = ProcessBoxCont::new(cont);
1736        self.call_process_boxed(cont, pid, p)
1737    }
1738
1739    #[doc(hidden)]
1740    #[inline]
1741    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1742        match pid.react_flag.read_at(p) {
1743            Some(true) => {
1744                let msg = String::from("Cannot passivate the process twice");
1745                let err = OtherError::retry(msg);
1746                cut_error_process_boxed(cont, pid, err, p)
1747            },
1748            _ => {
1749                match pid.react_cont.swap_at(Some(cont), p) {
1750                    Some(_) => panic!("The reactivation continuation has diverged"),
1751                    None => {
1752                        pid.react_priority.write_at(p.priority, p);
1753                        pid.react_flag.write_at(Some(true), p);
1754                        Result::Ok(())
1755                    }
1756                }
1757            }
1758        }
1759    }
1760}
1761
1762/// Passivate the `Process` computation before performing some action.
1763#[must_use = "computations are lazy and do nothing unless to be run"]
1764#[derive(Clone)]
1765pub struct PassivateBefore<M> {
1766
1767    /// The action to perform after passivating the process.
1768    comp: M
1769}
1770
1771impl<M> Process for PassivateBefore<M>
1772    where M: Event<Item = ()>
1773{
1774    type Item = ();
1775
1776    #[doc(hidden)]
1777    #[inline]
1778    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1779        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1780    {
1781        let cont = ProcessBoxCont::new(cont);
1782        self.call_process_boxed(cont, pid, p)
1783    }
1784
1785    #[doc(hidden)]
1786    #[inline]
1787    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1788        match pid.react_flag.read_at(p) {
1789            Some(true) => {
1790                let msg = String::from("Cannot passivate the process twice");
1791                let err = OtherError::retry(msg);
1792                cut_error_process_boxed(cont, pid, err, p)
1793            },
1794            _ => {
1795                match pid.react_cont.swap_at(Some(cont), p) {
1796                    Some(_) => panic!("The reactivation continuation has diverged"),
1797                    None => {
1798                        let PassivateBefore { comp } = self;
1799                        pid.react_priority.write_at(p.priority, p);
1800                        pid.react_flag.write_at(Some(true), p);
1801                        comp.call_event(p)
1802                    }
1803                }
1804            }
1805        }
1806    }
1807}
1808
1809/// Test whether the process with the specified identifier was passivated.
1810#[must_use = "computations are lazy and do nothing unless to be run"]
1811#[derive(Clone)]
1812pub struct IsPassivated {
1813
1814    /// The process identifier.
1815    pid: Grc<ProcessId>
1816}
1817
1818impl Event for IsPassivated {
1819
1820    type Item = bool;
1821
1822    #[doc(hidden)]
1823    #[inline]
1824    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1825        let IsPassivated { pid } = self;
1826        match pid.react_flag.read_at(p) {
1827            Some(true) => Result::Ok(true),
1828            _ => Result::Ok(false)
1829        }
1830    }
1831}
1832
1833/// Reactivate the process with the specified identifier.
1834#[must_use = "computations are lazy and do nothing unless to be run"]
1835#[derive(Clone)]
1836pub struct Reactivate {
1837
1838    /// The process identifier.
1839    pid: Grc<ProcessId>
1840}
1841
1842impl Event for Reactivate {
1843
1844    type Item = ();
1845
1846    #[doc(hidden)]
1847    #[inline]
1848    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1849        let Reactivate { pid } = self;
1850        match pid.react_cont.swap_at(None, p) {
1851            Some(cont) => {
1852                match pid.react_flag.swap_at(None, p) {
1853                    Some(true) => {
1854                        let priority = pid.react_priority.read_at(p);
1855                        enqueue_event_with_priority(p.time, priority, {
1856                            cons_event(move |p| {
1857                                resume_process_boxed(cont, pid, (), p)
1858                            }).into_boxed()
1859                        }).call_event(p)
1860                    },
1861                    _ => {
1862                        panic!("The reactivation continuation has diverged")
1863                    }
1864                }
1865            },
1866            None => {
1867                Result::Ok(())
1868            }
1869        }
1870    }
1871}
1872
1873/// Reactivate immediately the process with the specified identifier.
1874#[must_use = "computations are lazy and do nothing unless to be run"]
1875#[derive(Clone)]
1876pub struct ReactivateImmediately {
1877
1878    /// The process identifier.
1879    pid: Grc<ProcessId>
1880}
1881
1882impl Event for ReactivateImmediately {
1883
1884    type Item = ();
1885
1886    #[doc(hidden)]
1887    #[inline]
1888    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1889        let ReactivateImmediately { pid } = self;
1890        match pid.react_cont.swap_at(None, p) {
1891            Some(cont) => {
1892                match pid.react_flag.swap_at(None, p) {
1893                    Some(true) => {
1894                        let priority = pid.react_priority.read_at(p);
1895                        if p.priority == priority {
1896                            resume_process_boxed(cont, pid, (), p)
1897                        } else {
1898                            enqueue_event_with_priority(p.time, priority, {
1899                                cons_event(move |p| {
1900                                    resume_process_boxed(cont, pid, (), p)
1901                                }).into_boxed()
1902                            }).call_event(p)
1903                        }
1904                    },
1905                    _ => {
1906                        panic!("The reactivation continuation has diverged")
1907                    }
1908                }
1909            },
1910            None => {
1911                Result::Ok(())
1912            }
1913        }
1914    }
1915}
1916
1917/// Reactivate immediately the processes with specified identifiers.
1918#[must_use = "computations are lazy and do nothing unless to be run"]
1919#[derive(Clone)]
1920pub struct ReactivateManyImmediately {
1921
1922    /// The process identifiers.
1923    pids: List<Grc<ProcessId>>
1924}
1925
1926impl Event for ReactivateManyImmediately {
1927
1928    type Item = ();
1929
1930    #[doc(hidden)]
1931    #[inline]
1932    fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
1933        let ReactivateManyImmediately { pids } = self;
1934        match pids {
1935            List::Nil => Result::Ok(()),
1936            List::Cons(pid, pids) => {
1937                match ProcessId::reactivate_immediately(pid).call_event(p) {
1938                    Result::Err(e) => Result::Err(e),
1939                    Result::Ok(()) => {
1940                        let pids = pids.as_list();
1941                        let comp = ReactivateManyImmediately { pids: pids };
1942                        yield_event(comp).call_event(p)
1943                    }
1944                }
1945            }
1946        }
1947    }
1948}
1949
1950/// Executes the `Process` computation in loop.
1951#[must_use = "computations are lazy and do nothing unless to be run"]
1952#[derive(Clone)]
1953pub struct Loop<F, M> {
1954
1955    /// The computation generator.
1956    f: F,
1957
1958    /// The type keeper.
1959    _phantom: PhantomData<M>
1960}
1961
1962impl<F, M> Process for Loop<F, M>
1963    where F: Fn() -> M + Clone + 'static,
1964          M: Process<Item = ()> + 'static
1965{
1966    type Item = M::Item;
1967
1968    #[doc(hidden)]
1969    #[inline(never)]
1970    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
1971        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
1972    {
1973        let Loop { f, _phantom } = self;
1974        let comp = f();
1975        comp.and_then(move |()| {
1976            Loop { f: f, _phantom: PhantomData }
1977        }).call_process(cont, pid, p)
1978    }
1979
1980    #[doc(hidden)]
1981    #[inline(never)]
1982    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
1983        let Loop { f, _phantom } = self;
1984        let comp = f();
1985        comp.and_then(move |()| {
1986            Loop { f: f, _phantom: PhantomData }
1987        }).call_process_boxed(cont, pid, p)
1988    }
1989}
1990
1991/// The sequence of computations.
1992#[must_use = "computations are lazy and do nothing unless to be run"]
1993#[derive(Clone)]
1994pub struct Sequence<M> where M: Process {
1995
1996    /// The computations.
1997    comps: List<M>,
1998
1999    /// The accumulator of results.
2000    acc: List<M::Item>
2001}
2002
2003impl<M> Process for Sequence<M>
2004    where M: Process + Clone + 'static,
2005          M::Item: Clone
2006{
2007    type Item = List<M::Item>;
2008
2009    #[doc(hidden)]
2010    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2011        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2012    {
2013        let Sequence { comps, acc } = self;
2014        match comps {
2015            List::Nil => cont(Result::Ok(acc.reversed()), pid, p),
2016            List::Cons(comp, comps) => {
2017                comp.and_then(move |a| {
2018                    let comps = comps.as_list();
2019                    let acc = List::Cons(a, Rc::new(acc));
2020                    Sequence { comps: comps, acc: acc }
2021                }).call_process(cont, pid, p)
2022            }
2023        }
2024    }
2025
2026    #[doc(hidden)]
2027    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2028        let Sequence { comps, acc } = self;
2029        match comps {
2030            List::Nil => cont.call_box((Result::Ok(acc.reversed()), pid, p)),
2031            List::Cons(comp, comps) => {
2032                comp.and_then(move |a| {
2033                    let comps = comps.as_list();
2034                    let acc = List::Cons(a, Rc::new(acc));
2035                    Sequence { comps: comps, acc: acc }
2036                }).call_process_boxed(cont, pid, p)
2037            }
2038        }
2039    }
2040}
2041
2042/// The sequence of computations with ignored result.
2043#[must_use = "computations are lazy and do nothing unless to be run"]
2044#[derive(Clone)]
2045pub struct Sequence_<M>
2046{
2047    /// The computations.
2048    comps: List<M>
2049}
2050
2051impl<M> Process for Sequence_<M>
2052    where M: Process + Clone + 'static,
2053          M::Item: Clone
2054{
2055    type Item = ();
2056
2057    #[doc(hidden)]
2058    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2059        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2060    {
2061        let Sequence_ { comps } = self;
2062        match comps {
2063            List::Nil => cont(Result::Ok(()), pid, p),
2064            List::Cons(comp, comps) => {
2065                comp.and_then(move |_| {
2066                    let comps = comps.as_list();
2067                    Sequence_ { comps: comps }
2068                }).call_process(cont, pid, p)
2069            }
2070        }
2071    }
2072
2073    #[doc(hidden)]
2074    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2075        let Sequence_ { comps } = self;
2076        match comps {
2077            List::Nil => cont.call_box((Result::Ok(()), pid, p)),
2078            List::Cons(comp, comps) => {
2079                comp.and_then(move |_| {
2080                    let comps = comps.as_list();
2081                    Sequence_ { comps: comps }
2082                }).call_process_boxed(cont, pid, p)
2083            }
2084        }
2085    }
2086}
2087
2088/// Allows spawning another process bound with the current one.
2089#[must_use = "computations are lazy and do nothing unless to be run"]
2090#[derive(Clone)]
2091pub struct SpawnUsingId<M>
2092{
2093    /// How to cancel the processes.
2094    cancellation: ProcessCancellation,
2095
2096    /// The computation.
2097    comp: M,
2098
2099    /// The child process identifier.
2100    comp_id: Grc<ProcessId>
2101}
2102
2103impl<M> Process for SpawnUsingId<M>
2104    where M: Process<Item = ()> + Clone + 'static,
2105{
2106    type Item = ();
2107
2108    #[doc(hidden)]
2109    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2110        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2111    {
2112        if is_process_cancelled(&pid, p) {
2113            revoke_process(cont, pid, p)
2114        } else {
2115            let SpawnUsingId { cancellation, comp, comp_id } = self;
2116            match ProcessId::prepare(comp_id.clone(), p) {
2117                Result::Err(e) => Result::Err(e),
2118                Result::Ok(()) => {
2119                    let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2120                    match hs {
2121                        Result::Err(e) => Result::Err(e),
2122                        Result::Ok(hs) => {
2123                            enqueue_event(p.time, {
2124                                cons_event(move |p| {
2125                                    let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2126                                        hs.dispose(p)?;
2127                                        match a {
2128                                            Result::Ok(()) => Result::Ok(()),
2129                                            Result::Err(Error::Cancel) => Result::Ok(()),
2130                                            Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2131                                        }
2132                                    };
2133                                    comp.call_process(cont, comp_id, p)
2134                                }).into_boxed()
2135                            }).and_then(move |()| {
2136                                cons_event(move |p| {
2137                                    resume_process(cont, pid, (), p)
2138                                })
2139                            }).call_event(p)
2140                        }
2141                    }
2142                }
2143            }
2144        }
2145    }
2146
2147    #[doc(hidden)]
2148    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2149        if is_process_cancelled(&pid, p) {
2150            revoke_process_boxed(cont, pid, p)
2151        } else {
2152            let SpawnUsingId { cancellation, comp, comp_id } = self;
2153            match ProcessId::prepare(comp_id.clone(), p) {
2154                Result::Err(e) => Result::Err(e),
2155                Result::Ok(()) => {
2156                    let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
2157                    match hs {
2158                        Result::Err(e) => Result::Err(e),
2159                        Result::Ok(hs) => {
2160                            enqueue_event(p.time, {
2161                                cons_event(move |p| {
2162                                    let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
2163                                        hs.dispose(p)?;
2164                                        match a {
2165                                            Result::Ok(()) => Result::Ok(()),
2166                                            Result::Err(Error::Cancel) => Result::Ok(()),
2167                                            Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
2168                                        }
2169                                    };
2170                                    comp.call_process(cont, comp_id, p)
2171                                }).into_boxed()
2172                            }).and_then(move |()| {
2173                                cons_event(move |p| {
2174                                    resume_process_boxed(cont, pid, (), p)
2175                                })
2176                            }).call_event(p)
2177                        }
2178                    }
2179                }
2180            }
2181        }
2182    }
2183}
2184
2185/// Allows spawning another process bound with the current one.
2186#[must_use = "computations are lazy and do nothing unless to be run"]
2187#[derive(Clone)]
2188pub struct Spawn<M>
2189{
2190    /// How to cancel the processes.
2191    cancellation: ProcessCancellation,
2192
2193    /// The computation.
2194    comp: M
2195}
2196
2197impl<M> Process for Spawn<M>
2198    where M: Process<Item = ()> + Clone + 'static
2199{
2200    type Item = ();
2201
2202    #[doc(hidden)]
2203    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2204        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2205    {
2206        if is_process_cancelled(&pid, p) {
2207            revoke_process(cont, pid, p)
2208        } else {
2209            let Spawn { cancellation, comp } = self;
2210            ProcessId::new()
2211                .into_process()
2212                .and_then(move |pid| {
2213                    spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2214                })
2215                .call_process(cont, pid, p)
2216        }
2217    }
2218
2219    #[doc(hidden)]
2220    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2221        if is_process_cancelled(&pid, p) {
2222            revoke_process_boxed(cont, pid, p)
2223        } else {
2224            let Spawn { cancellation, comp } = self;
2225            ProcessId::new()
2226                .into_process()
2227                .and_then(move |pid| {
2228                    spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
2229                })
2230                .call_process_boxed(cont, pid, p)
2231        }
2232    }
2233}
2234
2235/// Allows running another process within the specified timeout by using the given identifier.
2236#[must_use = "computations are lazy and do nothing unless to be run"]
2237#[derive(Clone)]
2238pub struct TimeoutUsingId<M>
2239{
2240    /// Timeout.
2241    timeout: f64,
2242
2243    /// The computation.
2244    comp: M,
2245
2246    /// The child process identifier.
2247    comp_id: Grc<ProcessId>
2248}
2249
2250impl<M> Process for TimeoutUsingId<M>
2251    where M: Process + Clone + 'static,
2252          M::Item: Clone
2253{
2254    type Item = Option<M::Item>;
2255
2256    #[doc(hidden)]
2257    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2258        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2259    {
2260        if is_process_cancelled(&pid, p) {
2261            revoke_process(cont, pid, p)
2262        } else {
2263            let TimeoutUsingId { timeout, comp, comp_id } = self;
2264            let comp_id_clone = comp_id.clone();
2265            let s = Grc::new(ObservableSource::new());
2266            let s_clone = s.clone();
2267            ProcessId::new()
2268                .into_process()
2269                .and_then(move |timeout_id| {
2270                    let timeout_id = Grc::new(timeout_id);
2271                    spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2272                        hold_process(timeout)
2273                            .and_then(move |()| {
2274                                ProcessId::cancel(comp_id_clone)
2275                                    .into_process()
2276                            })
2277                    })
2278                    .and_then(move |()| {
2279                        let r = Grc::new(RefComp::new(None));
2280                        spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2281                            let r_clone = r.clone();
2282                            comp.and_then(move |item| {
2283                                    RefComp::write(r_clone, Some(item))
2284                                        .into_process()
2285                                })
2286                                .finally({
2287                                    ProcessId::cancel(timeout_id)
2288                                        .and_then(move |()| {
2289                                            RefComp::read(r)
2290                                                .and_then(move |item| {
2291                                                    s_clone.trigger(item)
2292                                                })
2293                                        })
2294                                        .into_process()
2295                                })
2296                        })
2297                    })
2298                })
2299                .and_then(move |()| {
2300                    process_await(s.publish())
2301                })
2302                .call_process(cont, pid, p)
2303        }
2304    }
2305
2306    #[doc(hidden)]
2307    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2308        if is_process_cancelled(&pid, p) {
2309            revoke_process_boxed(cont, pid, p)
2310        } else {
2311            let TimeoutUsingId { timeout, comp, comp_id } = self;
2312            let comp_id_clone = comp_id.clone();
2313            let s = Grc::new(ObservableSource::new());
2314            let s_clone = s.clone();
2315            ProcessId::new()
2316                .into_process()
2317                .and_then(move |timeout_id| {
2318                    let timeout_id = Grc::new(timeout_id);
2319                    spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
2320                        hold_process(timeout)
2321                            .and_then(move |()| {
2322                                ProcessId::cancel(comp_id_clone)
2323                                    .into_process()
2324                            })
2325                    })
2326                    .and_then(move |()| {
2327                        let r = Grc::new(RefComp::new(None));
2328                        spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
2329                            let r_clone = r.clone();
2330                            comp.and_then(move |item| {
2331                                    RefComp::write(r_clone, Some(item))
2332                                        .into_process()
2333                                })
2334                                .finally({
2335                                    ProcessId::cancel(timeout_id)
2336                                        .and_then(move |()| {
2337                                            RefComp::read(r)
2338                                                .and_then(move |item| {
2339                                                    s_clone.trigger(item)
2340                                                })
2341                                        })
2342                                        .into_process()
2343                                })
2344                        })
2345                    })
2346                })
2347                .and_then(move |()| {
2348                    process_await(s.publish())
2349                })
2350                .call_process_boxed(cont, pid, p)
2351        }
2352    }
2353}
2354
2355/// Allows running another process within the specified timeout.
2356#[must_use = "computations are lazy and do nothing unless to be run"]
2357#[derive(Clone)]
2358pub struct Timeout<M>
2359{
2360    /// Timeout.
2361    timeout: f64,
2362
2363    /// The computation.
2364    comp: M
2365}
2366
2367impl<M> Process for Timeout<M>
2368    where M: Process + Clone + 'static,
2369          M::Item: Clone
2370{
2371    type Item = Option<M::Item>;
2372
2373    #[doc(hidden)]
2374    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2375        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2376    {
2377        if is_process_cancelled(&pid, p) {
2378            revoke_process(cont, pid, p)
2379        } else {
2380            let Timeout { timeout, comp } = self;
2381            ProcessId::new()
2382                .into_process()
2383                .and_then(move |comp_id| {
2384                    timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2385                })
2386                .call_process(cont, pid, p)
2387        }
2388    }
2389
2390    #[doc(hidden)]
2391    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2392        if is_process_cancelled(&pid, p) {
2393            revoke_process_boxed(cont, pid, p)
2394        } else {
2395            let Timeout { timeout, comp } = self;
2396            ProcessId::new()
2397                .into_process()
2398                .and_then(move |comp_id| {
2399                    timeout_process_using_id(timeout, Grc::new(comp_id), comp)
2400                })
2401                .call_process_boxed(cont, pid, p)
2402        }
2403    }
2404}
2405
2406/// Represents a temporarily frozen computation.
2407#[doc(hidden)]
2408#[derive(Clone)]
2409pub struct FrozenProcess<T> {
2410
2411    /// The frozen continuation.
2412    comp: EventBox<Option<ProcessBoxCont<T>>>
2413}
2414
2415impl<T> FrozenProcess<T>
2416    where T: 'static
2417{
2418    /// Unfreeze the process computation.
2419    #[doc(hidden)]
2420    #[inline]
2421    pub fn unfreeze(self, p: &Point) -> simulation::Result<Option<ProcessBoxCont<T>>> {
2422        let FrozenProcess { comp } = self;
2423        comp.call_box((p,))
2424    }
2425
2426    /// Freeze the process computation.
2427    #[doc(hidden)]
2428    pub fn new(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<FrozenProcess<T>>
2429        where T: Clone
2430    {
2431        let cont = substitute_process_priority_boxed(p.priority, cont);
2432        let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2433        let rc = Grc::new(RefComp::new(Some(cont)));
2434        let h  = {
2435            let rh = rh.clone();
2436            let rc = rc.clone();
2437            let weak_pid = Grc::downgrade(&pid);
2438            pid.cancel_initiating()
2439                .subscribe(cons_observer(move |_, p| {
2440                    let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2441                    let h = rh.swap_at(None, p).unwrap();
2442                    h.dispose(p)?;
2443                    match rc.swap_at(None, p) {
2444                        None => Result::Ok(()),
2445                        Some(cont) => {
2446                            let pid = pid.clone();
2447                            enqueue_event(p.time, {
2448                                cons_event(move |p| {
2449                                    let z = is_process_cancelled(&pid, p);
2450                                    if z {
2451                                        revoke_process_boxed(cont, pid, p)
2452                                    } else {
2453                                        Result::Ok(())
2454                                    }
2455                                }).into_boxed()
2456                            }).call_event(p)
2457                        }
2458                    }
2459                }))
2460                .call_event(p)
2461        };
2462        match h {
2463            Result::Err(e) => Result::Err(e),
2464            Result::Ok(h) => {
2465                rh.write_at(Some(h), p);
2466                let comp = {
2467                    cons_event(move |p| {
2468                        let h = rh.swap_at(None, p).unwrap();
2469                        h.dispose(p)?;
2470                        let cont = rc.swap_at(None, p);
2471                        Result::Ok(cont)
2472                    }).into_boxed()
2473                };
2474                Result::Ok(FrozenProcess { comp: comp })
2475            }
2476        }
2477    }
2478
2479    /// Freeze the computation parameters specifying what should be done when reentering the computation.
2480    #[doc(hidden)]
2481    pub fn with_reentering<M>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, comp: M, p: &Point) -> simulation::Result<FrozenProcess<T>>
2482        where M: Process<Item = T> + Clone + 'static,
2483              T: Clone
2484    {
2485        let cont = substitute_process_priority_boxed(p.priority, cont);
2486        let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2487        let rc = Grc::new(RefComp::new(Some(cont)));
2488        let h  = {
2489            let rh = rh.clone();
2490            let rc = rc.clone();
2491            let weak_pid = Grc::downgrade(&pid);
2492            pid.cancel_initiating()
2493                .subscribe(cons_observer(move |_, p| {
2494                    let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
2495                    let h = rh.swap_at(None, p).unwrap();
2496                    h.dispose(p)?;
2497                    match rc.swap_at(None, p) {
2498                        None => Result::Ok(()),
2499                        Some(cont) => {
2500                            let pid = pid.clone();
2501                            enqueue_event(p.time, {
2502                                cons_event(move |p| {
2503                                    let z = is_process_cancelled(&pid, p);
2504                                    if z {
2505                                        revoke_process_boxed(cont, pid, p)
2506                                    } else {
2507                                        Result::Ok(())
2508                                    }
2509                                }).into_boxed()
2510                            }).call_event(p)
2511                        }
2512                    }
2513                }))
2514                .call_event(p)
2515        };
2516        match h {
2517            Result::Err(e) => Result::Err(e),
2518            Result::Ok(h) => {
2519                rh.write_at(Some(h), p);
2520                let comp = {
2521                    cons_event(move |p| {
2522                        let h = rh.swap_at(None, p).unwrap();
2523                        h.dispose(p)?;
2524                        match rc.swap_at(None, p) {
2525                            None => Result::Ok(None),
2526                            Some(cont) => {
2527                                let f = pid.is_preempted(p);
2528                                if !f  {
2529                                    Result::Ok(Some(cont))
2530                                } else {
2531                                    let cont = ProcessBoxCont::new(move |a, pid, p| {
2532                                        match a {
2533                                            Result::Ok(_) => comp.call_process_boxed(cont, pid, p),
2534                                            Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2535                                        }
2536                                    });
2537                                    match sleep_process(cont, pid, val, p) {
2538                                        Result::Ok(()) => Result::Ok(None),
2539                                        Result::Err(e) => Result::Err(e)
2540                                    }
2541                                }
2542                            }
2543                        }
2544                    }).into_boxed()
2545                };
2546                Result::Ok(FrozenProcess { comp: comp })
2547            }
2548        }
2549    }
2550}
2551
2552/// Reenter the process computation.
2553#[doc(hidden)]
2554pub fn reenter_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2555    where T: Clone + 'static
2556{
2557    let f = pid.is_preempted(p);
2558    if !f {
2559        enqueue_event(p.time, {
2560            cons_event(move |p| {
2561                let f = pid.is_preempted(p);
2562                if !f {
2563                    resume_process_boxed(cont, pid, val, p)
2564                } else {
2565                    sleep_process(cont, pid, val, p)
2566                }
2567            }).into_boxed()
2568        }).call_event(p)
2569    } else {
2570        sleep_process(cont, pid, val, p)
2571    }
2572}
2573
2574/// Sleep until the preempted computation will be reentered.
2575#[doc(hidden)]
2576pub fn sleep_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
2577    where T: Clone + 'static
2578{
2579    let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2580    let rc = Grc::new(RefComp::new(Some(cont)));
2581    let rv = Grc::new(RefComp::new(Some(val)));
2582    let h  = {
2583        let rh = rh.clone();
2584        pid.observable()
2585            .subscribe(cons_observer(move |e, p| {
2586                let h = rh.swap_at(None, p).unwrap();
2587                h.dispose(p)?;
2588                match e {
2589                    &ProcessEvent::CancelInitiating => {
2590                        let pid = pid.clone();
2591                        let rc  = rc.clone();
2592                        enqueue_event(p.time, {
2593                            cons_event(move |p| {
2594                                let z = is_process_cancelled(&pid, p);
2595                                if z {
2596                                    let cont = rc.swap_at(None, p).unwrap();
2597                                    revoke_process_boxed(cont, pid, p)
2598                                } else {
2599                                    Result::Ok(())
2600                                }
2601                            }).into_boxed()
2602                        }).call_event(p)
2603                    },
2604                    &ProcessEvent::PreemptionEnding => {
2605                        let pid = pid.clone();
2606                        let rc  = rc.clone();
2607                        let rv  = rv.clone();
2608                        enqueue_event(p.time, {
2609                            cons_event(move |p| {
2610                                let cont = rc.swap_at(None, p).unwrap();
2611                                let val  = rv.swap_at(None, p).unwrap();
2612                                reenter_process(cont, pid, val, p)
2613                            }).into_boxed()
2614                        }).call_event(p)
2615                    },
2616                    &ProcessEvent::PreemptionInitiating => {
2617                        panic!("The computation was already preempted")
2618                    }
2619                }
2620            }))
2621            .call_event(p)
2622    };
2623    match h {
2624        Result::Err(e) => Result::Err(e),
2625        Result::Ok(h) => {
2626            rh.write_at(Some(h), p);
2627            Result::Ok(())
2628        }
2629    }
2630}
2631
2632/// Substitute the process computation.
2633#[doc(hidden)]
2634pub fn substitute_process<C, T, F>(cont: C, f: F) -> ProcessBoxCont<T>
2635    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static,
2636          T: 'static,
2637          F: FnOnce(C, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + Clone + 'static,
2638{
2639    ProcessBoxCont::new(move |a, pid, p| {
2640        match a {
2641            Result::Ok(a) => f(cont, pid, a, p),
2642            Result::Err(e) => cont(Result::Err(e), pid, p)
2643        }
2644    })
2645}
2646
2647/// Substitute the process computation.
2648#[doc(hidden)]
2649pub fn substitute_process_boxed<T, F>(cont: ProcessBoxCont<T>, f: F) -> ProcessBoxCont<T>
2650    where T: 'static,
2651          F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + Clone + 'static
2652{
2653    ProcessBoxCont::new(move |a, pid, p| {
2654        match a {
2655            Result::Ok(a) => f(cont, pid, a, p),
2656            Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
2657        }
2658    })
2659}
2660
2661/// Substitute the process priority.
2662#[doc(hidden)]
2663pub fn substitute_process_priority<C, T>(priority: isize, cont: C) -> ProcessBoxCont<T>
2664    where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static,
2665          T: Clone + 'static
2666{
2667    ProcessBoxCont::new(move |a, pid, p| {
2668        match a {
2669            Result::Ok(a) => {
2670                if p.priority == priority {
2671                    cont(Result::Ok(a), pid, p)
2672                } else {
2673                    enqueue_event_with_priority(p.time, priority, {
2674                        cons_event(move |p| {
2675                            resume_process(cont, pid, a, p)
2676                        }).into_boxed()
2677                    }).call_event(p)
2678                }
2679            },
2680            Result::Err(e) => {
2681                cont(Result::Err(e), pid, p)
2682            }
2683        }
2684    })
2685}
2686
2687/// Substitute the process priority.
2688#[doc(hidden)]
2689pub fn substitute_process_priority_boxed<T>(priority: isize, cont: ProcessBoxCont<T>) -> ProcessBoxCont<T>
2690    where T: Clone + 'static
2691{
2692    ProcessBoxCont::new(move |a, pid, p| {
2693        match a {
2694            Result::Ok(a) => {
2695                if p.priority == priority {
2696                    cont.call_box((Result::Ok(a), pid, p))
2697                } else {
2698                    enqueue_event_with_priority(p.time, priority, {
2699                        cons_event(move |p| {
2700                            resume_process_boxed(cont, pid, a, p)
2701                        }).into_boxed()
2702                    }).call_event(p)
2703                }
2704            },
2705            Result::Err(e) => {
2706                cont.call_box((Result::Err(e), pid, p))
2707            }
2708        }
2709    })
2710}
2711
2712/// Await a signal that should be emitted by the specified observable.
2713#[must_use = "computations are lazy and do nothing unless to be run"]
2714#[derive(Clone)]
2715pub struct Await<O, M> {
2716
2717    /// The observable.
2718    observable: O,
2719
2720    /// To keep the type parameter.
2721    _phantom: PhantomData<M>
2722}
2723
2724impl<O, M> Process for Await<O, M>
2725    where O: Observable<Message = M>,
2726          M: Clone + 'static
2727{
2728    type Item = M;
2729
2730    #[doc(hidden)]
2731    #[inline]
2732    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2733        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2734    {
2735        self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2736    }
2737
2738    #[doc(hidden)]
2739    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2740        match FrozenProcess::new(cont, pid.clone(), p) {
2741            Result::Err(e) => Result::Err(e),
2742            Result::Ok(cont) => {
2743                let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2744                let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2745                let rc = RefComp::new(Some(cont));
2746                let h  = {
2747                    let rh = rh.clone();
2748                    let rh2 = rh2.clone();
2749                    let pid = pid.clone();
2750                    let Await { observable, _phantom } = self;
2751                    observable
2752                        .subscribe(cons_observer(move |x: &M, p| {
2753                            if let Some(h) = rh.swap_at(None, p) {
2754                                h.dispose(p)?;
2755                            }                            
2756                            if let Some(h2) = rh2.swap_at(None, p) {
2757                                h2.dispose(p)?;
2758                            }                            
2759                            let cont = rc.swap_at(None, p).unwrap();
2760                            match cont.unfreeze(p) {
2761                                Result::Err(e) => Result::Err(e),
2762                                Result::Ok(None) => Result::Ok(()),
2763                                Result::Ok(Some(cont)) => {
2764                                    let pid = pid.clone();
2765                                    reenter_process(cont, pid, x.clone(), p)
2766                                }
2767                            }
2768                        }))
2769                        .call_event(p)
2770                };
2771                match h {
2772                    Result::Err(e) => return Result::Err(e),
2773                    Result::Ok(h) => {
2774                        let h2 = {
2775                            let rh = rh.clone();
2776                            let rh2 = rh2.clone();
2777                            pid.cancel_initiating()
2778                                .subscribe(cons_observer(move |_, p| {
2779                                    if let Some(h) = rh.swap_at(None, p) {
2780                                        h.dispose(p)?;
2781                                    }                            
2782                                    if let Some(h2) = rh2.swap_at(None, p) {
2783                                        h2.dispose(p)?;
2784                                    }
2785                                    Result::Ok(())
2786                                }))
2787                                .call_event(p)
2788                        };
2789                        match h2 {
2790                            Result::Err(e) => {
2791                                h.dispose(p)?;
2792                                return Result::Err(e)
2793                            },
2794                            Result::Ok(h2) => {
2795                                rh.write_at(Some(h), p);
2796                                rh2.write_at(Some(h2), p);
2797                                Result::Ok(())
2798                            }
2799                        }
2800                    }
2801                }
2802            }
2803        }
2804    }
2805}
2806
2807/// Await a cancellable signal that should be emitted by the specified observable.
2808#[must_use = "computations are lazy and do nothing unless to be run"]
2809#[derive(Clone)]
2810pub struct AwaitResult<O, M> {
2811
2812    /// The observable.
2813    observable: O,
2814
2815    /// To keep the type parameter.
2816    _phantom: PhantomData<M>
2817}
2818
2819impl<O, M> Process for AwaitResult<O, M>
2820    where O: Observable<Message = simulation::Result<M>>,
2821          M: Clone + 'static
2822{
2823    type Item = M;
2824
2825    #[doc(hidden)]
2826    #[inline]
2827    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2828        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2829    {
2830        self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
2831    }
2832
2833    #[doc(hidden)]
2834    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2835        match FrozenProcess::new(cont, pid.clone(), p) {
2836            Result::Err(e) => Result::Err(e),
2837            Result::Ok(cont) => {
2838                let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2839                let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
2840                let rc = RefComp::new(Some(cont));
2841                let h  = {
2842                    let rh = rh.clone();
2843                    let rh2 = rh2.clone();
2844                    let pid = pid.clone();
2845                    let AwaitResult { observable, _phantom } = self;
2846                    observable
2847                        .subscribe(cons_observer(move |x: &simulation::Result<M>, p| {
2848                            if let Some(h) = rh.swap_at(None, p) {
2849                                h.dispose(p)?;
2850                            }                            
2851                            if let Some(h2) = rh2.swap_at(None, p) {
2852                                h2.dispose(p)?;
2853                            }                            
2854                            let cont = rc.swap_at(None, p).unwrap();
2855                            match cont.unfreeze(p) {
2856                                Result::Err(e) => Result::Err(e),
2857                                Result::Ok(None) => Result::Ok(()),
2858                                Result::Ok(Some(cont)) => {
2859                                    let pid = pid.clone();
2860                                    match x {
2861                                        Result::Ok(x) => {
2862                                            reenter_process(cont, pid, x.clone(), p)
2863                                        },
2864                                        Result::Err(Error::Cancel) => {
2865                                            revoke_process_boxed(cont, pid, p)
2866                                        },
2867                                        Result::Err(Error::Other(e)) => {
2868                                            match e.deref() {
2869                                                &OtherError::Retry(_) => {
2870                                                    cut_error_process_boxed(cont, pid, e.clone(), p)
2871                                                },
2872                                                &OtherError::Panic(_) => {
2873                                                    cut_error_process_boxed(cont, pid, e.clone(), p)
2874                                                },
2875                                                &OtherError::IO(_) => {
2876                                                    propagate_error_process_boxed(cont, pid, e.clone(), p)
2877                                                }
2878                                            }
2879                                        }
2880                                    }
2881                                }
2882                            }
2883                        }))
2884                        .call_event(p)
2885                };
2886                match h {
2887                    Result::Err(e) => return Result::Err(e),
2888                    Result::Ok(h) => {
2889                        let h2 = {
2890                            let rh = rh.clone();
2891                            let rh2 = rh2.clone();
2892                            pid.cancel_initiating()
2893                                .subscribe(cons_observer(move |_, p| {
2894                                    if let Some(h) = rh.swap_at(None, p) {
2895                                        h.dispose(p)?;
2896                                    }                            
2897                                    if let Some(h2) = rh2.swap_at(None, p) {
2898                                        h2.dispose(p)?;
2899                                    }
2900                                    Result::Ok(())
2901                                }))
2902                                .call_event(p)
2903                        };
2904                        match h2 {
2905                            Result::Err(e) => {
2906                                h.dispose(p)?;
2907                                return Result::Err(e)
2908                            },
2909                            Result::Ok(h2) => {
2910                                rh.write_at(Some(h), p);
2911                                rh2.write_at(Some(h2), p);
2912                                Result::Ok(())
2913                            }
2914                        }
2915                    }
2916                }
2917            }
2918        }
2919    }
2920}
2921
2922/// Like the GoTo statement it transfers the direction of computation.
2923#[must_use = "computations are lazy and do nothing unless to be run"]
2924#[derive(Clone)]
2925pub struct Transfer<T, M> {
2926
2927    /// The computation to transfer the flow control to.
2928    comp: M,
2929
2930    /// To keep the type parameter.
2931    _phantom: PhantomData<T>
2932}
2933
2934impl<T, M> Process for Transfer<T, M>
2935    where M: Process<Item = ()>
2936{
2937    type Item = T;
2938
2939    #[doc(hidden)]
2940    #[inline]
2941    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
2942        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2943    {
2944        if is_process_cancelled(&pid, p) {
2945            revoke_process(cont, pid, p)
2946        } else {
2947            let Transfer { comp, _phantom } = self;
2948            fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2949                a
2950            }
2951            comp.call_process(cont, pid, p)
2952        }
2953    }
2954
2955    #[doc(hidden)]
2956    #[inline]
2957    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
2958        if is_process_cancelled(&pid, p) {
2959            revoke_process_boxed(cont, pid, p)
2960        } else {
2961            let Transfer { comp, _phantom } = self;
2962            fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2963                a
2964            }
2965            comp.call_process(cont, pid, p)
2966        }
2967    }
2968}
2969
2970/// The process that never returns a result.
2971#[must_use = "computations are lazy and do nothing unless to be run"]
2972#[derive(Clone)]
2973pub struct Never<T> {
2974
2975    /// To keep the type parameter.
2976    _phantom: PhantomData<T>
2977}
2978
2979impl<T> Process for Never<T> {
2980
2981    type Item = T;
2982
2983    #[doc(hidden)]
2984    #[inline]
2985    fn call_process<C>(self, _cont: C, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()>
2986        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
2987    {
2988        Result::Ok(())
2989    }
2990
2991    #[doc(hidden)]
2992    #[inline]
2993    fn call_process_boxed(self, _cont: ProcessBoxCont<Self::Item>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
2994        Result::Ok(())
2995    }
2996}
2997
2998/// Register a handler that will be invoked in case of cancelling the current process.
2999#[must_use = "computations are lazy and do nothing unless to be run"]
3000#[derive(Clone)]
3001pub struct WhenCancelling<M> {
3002
3003    /// The computation.
3004    comp: M
3005}
3006
3007impl<M> Process for WhenCancelling<M>
3008    where M: Observer<Message = (), Item = ()> + Clone + 'static
3009{
3010    type Item = ();
3011
3012    #[doc(hidden)]
3013    #[inline]
3014    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3015        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
3016    {
3017        if is_process_cancelled(&pid, p) {
3018            revoke_process(cont, pid, p)
3019        } else {
3020            let WhenCancelling { comp } = self;
3021            pid.cancel_initiating()
3022                .subscribe(cons_observer(move |_, p| {
3023                    comp.call_observer(&(), p)
3024                }))
3025                .call_event(p)?;
3026            resume_process(cont, pid, (), p)
3027        }
3028    }
3029
3030    #[doc(hidden)]
3031    #[inline]
3032    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3033        if is_process_cancelled(&pid, p) {
3034            revoke_process_boxed(cont, pid, p)
3035        } else {
3036            let WhenCancelling { comp } = self;
3037            pid.cancel_initiating()
3038                .subscribe(cons_observer(move |_, p| {
3039                    comp.call_observer(&(), p)
3040                }))
3041                .call_event(p)?;
3042            resume_process_boxed(cont, pid, (), p)
3043        }
3044    }
3045}
3046
3047/// Allows creating the `Process` computation that panics with the specified message.
3048#[must_use = "computations are lazy and do nothing unless to be run"]
3049#[derive(Clone)]
3050pub struct Panic<T> {
3051
3052    /// The panic message.
3053    msg: String,
3054
3055    /// Keep the type parameter.
3056    _phantom: PhantomData<T>
3057}
3058
3059impl<T> Process for Panic<T> {
3060
3061    type Item = T;
3062
3063    #[doc(hidden)]
3064    #[inline]
3065    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3066        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
3067    {
3068        if is_process_cancelled(&pid, p) {
3069            revoke_process(cont, pid, p)
3070        } else {
3071            let Panic { msg, _phantom } = self;
3072            let err = Rc::new(OtherError::Panic(msg));
3073            cut_error_process(cont, pid, err, p)
3074        }
3075    }
3076
3077    #[doc(hidden)]
3078    #[inline]
3079    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3080        if is_process_cancelled(&pid, p) {
3081            revoke_process_boxed(cont, pid, p)
3082        } else {
3083            let Panic { msg, _phantom } = self;
3084            let err = Rc::new(OtherError::Panic(msg));
3085            cut_error_process_boxed(cont, pid, err, p)
3086        }
3087    }
3088}
3089
3090/// Trace the computation.
3091#[must_use = "computations are lazy and do nothing unless to be run"]
3092#[derive(Clone)]
3093pub struct Trace<M> {
3094
3095    /// The computation.
3096    comp: M,
3097
3098    /// The message to print.
3099    msg: String
3100}
3101
3102impl<M> Process for Trace<M>
3103    where M: Process
3104{
3105    type Item = M::Item;
3106
3107    #[doc(hidden)]
3108    #[inline]
3109    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3110        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
3111    {
3112        if is_process_cancelled(&pid, p) {
3113            revoke_process(cont, pid, p)
3114        } else {
3115            let Trace { comp, msg } = self;
3116            p.trace(&msg);
3117            comp.call_process(cont, pid, p)
3118        }
3119    }
3120
3121    #[doc(hidden)]
3122    #[inline]
3123    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3124        if is_process_cancelled(&pid, p) {
3125            revoke_process_boxed(cont, pid, p)
3126        } else {
3127            let Trace { comp, msg } = self;
3128            p.trace(&msg);
3129            comp.call_process_boxed(cont, pid, p)
3130        }
3131    }
3132}
3133
3134#[must_use = "computations are lazy and do nothing unless to be run"]
3135#[derive(Clone)]
3136pub struct ProcessWithPriority {
3137
3138    /// The time point priority.
3139    priority: isize
3140}
3141
3142impl Process for ProcessWithPriority {
3143
3144    type Item = ();
3145
3146    #[doc(hidden)]
3147    #[inline]
3148    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3149        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
3150    {
3151        if is_process_cancelled(&pid, p) {
3152            revoke_process(cont, pid, p)
3153        } else {
3154            let ProcessWithPriority { priority } = self;
3155            if priority == p.priority {
3156                cont(Result::Ok(()), pid, p)
3157            } else {
3158                let comp = cons_event(move |p| {
3159                    if is_process_cancelled(&pid, p) {
3160                        revoke_process(cont, pid, p)
3161                    } else {
3162                        cont(Result::Ok(()), pid, p)
3163                    }
3164                });
3165                enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3166                    .call_event(p)
3167            }
3168        }
3169    }
3170
3171    #[doc(hidden)]
3172    #[inline]
3173    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3174        if is_process_cancelled(&pid, p) {
3175            revoke_process_boxed(cont, pid, p)
3176        } else {
3177            let ProcessWithPriority { priority } = self;
3178            if priority == p.priority {
3179                cont.call_box((Result::Ok(()), pid, p))
3180            } else {
3181                let comp = cons_event(move |p| {
3182                    if is_process_cancelled(&pid, p) {
3183                        revoke_process_boxed(cont, pid, p)
3184                    } else {
3185                        cont.call_box((Result::Ok(()), pid, p))
3186                    }
3187                });
3188                enqueue_event_with_priority(p.time, priority, comp.into_boxed())
3189                    .call_event(p)
3190            }
3191        }
3192    }
3193}
3194
3195/// Embeds the result value within the `Process` computation.
3196#[must_use = "computations are lazy and do nothing unless to be run"]
3197#[derive(Clone)]
3198pub struct EmbedResult<T> {
3199
3200    /// The value to be embedded.
3201    val: simulation::Result<T>
3202}
3203
3204impl<T> Process for EmbedResult<T> {
3205
3206    type Item = T;
3207
3208    #[doc(hidden)]
3209    #[inline]
3210    fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
3211        where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + Clone + 'static
3212    {
3213        if is_process_cancelled(&pid, p) {
3214            revoke_process(cont, pid, p)
3215        } else {
3216            let EmbedResult { val } = self;
3217            match val {
3218                Result::Ok(a) => {
3219                    cont(Result::Ok(a), pid, p)
3220                },
3221                Result::Err(Error::Cancel) => {
3222                    let comp = cancel_process();
3223                    comp.call_process(cont, pid, p)
3224                },
3225                Result::Err(Error::Other(e)) => {
3226                    match e.deref() {
3227                        &OtherError::Retry(_) => {
3228                            cut_error_process(cont, pid, e, p)
3229                        },
3230                        &OtherError::Panic(_) => {
3231                            cut_error_process(cont, pid, e, p)
3232                        },
3233                        &OtherError::IO(_) => {
3234                            propagate_error_process(cont, pid, e, p)
3235                        }
3236                    }
3237                }
3238            }
3239        }
3240    }
3241
3242    #[doc(hidden)]
3243    #[inline]
3244    fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
3245        if is_process_cancelled(&pid, p) {
3246            revoke_process_boxed(cont, pid, p)
3247        } else {
3248            let EmbedResult { val } = self;
3249            match val {
3250                Result::Ok(a) => {
3251                    cont.call_box((Result::Ok(a), pid, p))
3252                },
3253                Result::Err(Error::Cancel) => {
3254                    let comp = cancel_process();
3255                    comp.call_process_boxed(cont, pid, p)
3256                },
3257                Result::Err(Error::Other(e)) => {
3258                    match e.deref() {
3259                        &OtherError::Retry(_) => {
3260                            cut_error_process_boxed(cont, pid, e, p)
3261                        },
3262                        &OtherError::Panic(_) => {
3263                            cut_error_process_boxed(cont, pid, e, p)
3264                        },
3265                        &OtherError::IO(_) => {
3266                            propagate_error_process_boxed(cont, pid, e, p)
3267                        }
3268                    }
3269                }
3270            }
3271        }
3272    }
3273}