use std::rc::Rc;
use std::marker::PhantomData;
use std::ops::Deref;
use crate::simulation;
use crate::simulation::Point;
use crate::simulation::error::*;
use crate::simulation::event::*;
use crate::simulation::{Run as SimulationRun};
use crate::simulation::simulation::Simulation;
use crate::simulation::observable::*;
use crate::simulation::observable::observer::*;
use crate::simulation::observable::source::*;
use crate::simulation::observable::disposable::*;
use crate::simulation::ref_comp::RefComp;
use dvcompute_utils::grc::Grc;
pub mod random;
pub mod ops;
#[inline]
pub fn return_process<T>(val: T) -> Return<T> {
Return { val: val }
}
#[inline]
pub fn panic_process<T>(msg: String) -> Panic<T> {
Panic { msg: msg, _phantom: PhantomData }
}
#[inline]
pub fn delay_process<F, M>(f: F) -> Delay<F, M>
where F: FnOnce() -> M + 'static,
M: Process + 'static
{
Delay { f: f, _phantom: PhantomData }
}
#[inline]
pub fn process_id() -> Id {
Id {}
}
#[inline]
pub fn cancel_process<T>() -> Cancel<T> {
Cancel { _phantom: PhantomData }
}
#[inline]
pub fn hold_process(dt: f64) -> Hold {
Hold { dt: dt }
}
#[inline]
pub fn passivate_process() -> Passivate {
Passivate {}
}
#[inline]
pub fn passivate_process_before<M>(comp: M) -> PassivateBefore<M>
where M: Event<Item = ()>
{
PassivateBefore { comp: comp }
}
#[inline]
pub fn loop_process<F, M>(f: F) -> Loop<F, M>
where F: Fn() -> M + 'static,
M: Process<Item = ()> + 'static
{
Loop { f: f, _phantom: PhantomData }
}
#[inline]
pub fn spawn_process<M>(comp: M) -> Spawn<M>
where M: Process<Item = ()> + 'static
{
spawn_process_with(ProcessCancellation::CancelTogether, comp)
}
#[inline]
pub fn spawn_process_using_id<M>(pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
where M: Process<Item = ()> + 'static
{
spawn_process_using_id_with(ProcessCancellation::CancelTogether, pid, comp)
}
#[inline]
pub fn spawn_process_with<M>(cancellation: ProcessCancellation, comp: M) -> Spawn<M>
where M: Process<Item = ()> + 'static
{
Spawn { cancellation: cancellation, comp: comp }
}
#[inline]
pub fn spawn_process_using_id_with<M>(cancellation: ProcessCancellation, pid: Grc<ProcessId>, comp: M) -> SpawnUsingId<M>
where M: Process<Item = ()> + 'static
{
SpawnUsingId { cancellation: cancellation, comp: comp, comp_id: pid }
}
#[inline]
pub fn process_await<O>(observable: O) -> Await<O, O::Message>
where O: Observable,
O::Message: Clone + 'static
{
Await { observable: observable, _phantom: PhantomData }
}
#[inline]
pub fn process_await_result<O, M>(observable: O) -> AwaitResult<O, M>
where O: Observable<Message = simulation::Result<M>>,
M: Clone + 'static
{
AwaitResult { observable: observable, _phantom: PhantomData }
}
#[inline]
pub fn timeout_process<M>(timeout: f64, comp: M) -> Timeout<M>
where M: Process + 'static,
M::Item: Clone
{
Timeout { timeout: timeout, comp: comp }
}
#[inline]
pub fn timeout_process_using_id<M>(timeout: f64, pid: Grc<ProcessId>, comp: M) -> TimeoutUsingId<M>
where M: Process + 'static,
M::Item: Clone
{
TimeoutUsingId { timeout: timeout, comp: comp, comp_id: pid }
}
#[inline]
pub fn transfer_process<T, M>(comp: M) -> Transfer<T, M>
where M: Process<Item = ()>
{
Transfer { comp: comp, _phantom: PhantomData }
}
#[inline]
pub fn never_process<T>() -> Never<T> {
Never { _phantom: PhantomData }
}
#[inline]
pub fn when_cancelling_process<M>(comp: M) -> WhenCancelling<M>
where M: Observer<Message = (), Item = ()> + 'static
{
WhenCancelling { comp: comp }
}
#[inline]
pub fn embed_result_process<T>(val: simulation::Result<T>) -> EmbedResult<T> {
EmbedResult { val: val }
}
#[inline]
pub fn process_sequence<I, M>(comps: I) -> Sequence<I::IntoIter, M>
where I: IntoIterator<Item = M> + 'static,
M: Process + 'static
{
let comps = comps.into_iter();
let acc = {
match comps.size_hint() {
(_, Some(n)) => Vec::with_capacity(n),
(_, None) => Vec::new()
}
};
Sequence { comps: comps, acc: acc }
}
#[inline]
pub fn process_sequence_<I, M>(comps: I) -> Sequence_<I::IntoIter, M>
where I: IntoIterator<Item = M> + 'static,
M: Process + 'static
{
Sequence_ { comps: comps.into_iter(), _phantom: PhantomData }
}
#[inline]
pub fn process_with_priority(priority: isize) -> ProcessWithPriority {
ProcessWithPriority { priority: priority }
}
#[inline]
pub fn restore_process_priority<M>(priority: isize, comp: M) -> impl Process<Item = M::Item>
where M: Process + 'static
{
process_with_priority(priority).and_then(|()| { comp })
}
#[inline]
pub fn trace_process<M>(msg: String, comp: M) -> Trace<M>
where M: Process
{
Trace { comp: comp, msg: msg}
}
#[doc(hidden)]
#[inline]
pub fn is_process_cancelled(pid: &ProcessId, p: &Point) -> bool {
pid.is_cancel_activated(p)
}
#[doc(hidden)]
pub fn revoke_process<T, C>(cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
{
pid.deactivate_cancel(p);
cont(Result::Err(Error::Cancel), pid, p)
}
#[doc(hidden)]
pub fn revoke_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
pid.deactivate_cancel(p);
cont.call_box((Result::Err(Error::Cancel), pid, p))
}
#[doc(hidden)]
pub fn cut_error_process<T, C>(_cont: C, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
{
Result::Err(Error::Other(err))
}
#[doc(hidden)]
pub fn cut_error_process_boxed<T>(_cont: ProcessBoxCont<T>, _pid: Grc<ProcessId>, err: Rc<OtherError>, _p: &Point) -> simulation::Result<()> {
Result::Err(Error::Other(err))
}
#[doc(hidden)]
pub fn propagate_error_process<T, C>(cont: C, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
{
cont(Result::Err(Error::Other(err)), pid, p)
}
#[doc(hidden)]
pub fn propagate_error_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, err: Rc<OtherError>, p: &Point) -> simulation::Result<()> {
cont.call_box((Result::Err(Error::Other(err)), pid, p))
}
#[doc(hidden)]
#[inline]
pub fn resume_process<T, C>(cont: C, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()>
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
cont(Result::Ok(t), pid, p)
}
}
#[doc(hidden)]
#[inline]
pub fn resume_process_boxed<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, t: T, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
cont.call_box((Result::Ok(t), pid, p))
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ProcessCancellation {
CancelTogether,
CancelChildAfterParent,
CancelParentAfterChild,
CancelInIsolation
}
pub struct ProcessId {
cancel_initiated: RefComp<bool>,
cancel_activated: RefComp<bool>,
preemption_count: RefComp<isize>,
source: ObservableSource<ProcessEvent>,
started: RefComp<bool>,
react_flag: RefComp<Option<bool>>,
react_cont: RefComp<Option<ProcessBoxCont<()>>>,
react_priority: RefComp<isize>,
interrupt_flag: RefComp<Option<bool>>,
interrupt_cont: RefComp<Option<ProcessBoxCont<()>>>,
interrupt_time: RefComp<f64>,
interrupt_ver: RefComp<i64>,
interrupt_priority: RefComp<isize>
}
impl ProcessId {
pub fn new() -> NewProcessId {
NewProcessId {}
}
fn prepare(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let y = pid.started.read_at(p);
if y {
panic!("Another process with the specified identifier has been started already")
} else {
pid.started.write_at(true, p)
}
let disposable = {
let observable = pid.observable();
let pid = Grc::downgrade(&pid);
observable
.subscribe(cons_observer(move |e, p| {
match pid.upgrade() {
None => {
let msg = String::from("It should not happen");
let err = Error::retry(msg);
Result::Err(err)
},
Some(pid) => {
match e {
ProcessEvent::CancelInitiating => {
if pid.is_cancel_activated(p) {
match ProcessId::interrupt(pid.clone()).call_event(p) {
Result::Ok(()) => ProcessId::reactivate(pid).call_event(p),
Result::Err(e) => Result::Err(e)
}
} else {
Result::Ok(())
}
},
ProcessEvent::PreemptionInitiating => {
ProcessId::on_preempted_at(pid, p)
},
ProcessEvent::PreemptionEnding => {
Result::Ok(())
}
}
}
}
})).call_event(p)
};
match disposable {
Result::Ok(_) => Result::Ok(()),
Result::Err(e) => Result::Err(e)
}
}
#[inline]
pub fn observable(&self) -> Publish<ProcessEvent> {
self.source.publish()
}
#[inline]
pub fn cancel_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
self.observable().filter(|m| { *m == ProcessEvent::CancelInitiating })
}
#[doc(hidden)]
#[inline]
pub fn is_cancel_initiated_at(&self, p: &Point) -> bool {
self.cancel_initiated.read_at(p)
}
#[inline]
pub fn is_cancel_initiated(pid: Grc<ProcessId>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| Result::Ok(pid.is_cancel_initiated_at(p)))
}
#[inline]
fn is_cancel_activated(&self, p: &Point) -> bool {
self.cancel_activated.read_at(p)
}
#[inline]
fn deactivate_cancel(&self, p: &Point) {
self.cancel_activated.write_at(false, p);
}
#[inline]
pub fn cancel(pid: Grc<ProcessId>) -> CancelById {
CancelById { pid: pid }
}
fn _bind_cancel(this: Grc<ProcessId>, others: Vec<Grc<ProcessId>>, p: &Point) -> simulation::Result<DisposableBox> {
let this2 = this.clone();
let this1 = this;
let others2 = {
others.iter().map(|pid| pid.clone()).collect::<Vec<_>>()
};
let others1 = others;
let hs1 = {
others1.iter().map(|pid| {
let this1 = Grc::downgrade(&this1);
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
match this1.upgrade() {
None => Result::Ok(()),
Some(this1) => this1.initiate_cancel_at(p)
}
}))
})
};
let hs1 = event_sequence(hs1).call_event(p);
match hs1 {
Result::Err(e) => Result::Err(e),
Result::Ok(hs1) => {
let hs2 = {
others2.iter().map(|pid| {
let pid = Grc::downgrade(&pid);
this2.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
match pid.upgrade() {
None => Result::Ok(()),
Some(pid) => pid.initiate_cancel_at(p)
}
}))
})
};
let hs2 = event_sequence(hs2).call_event(p);
match hs2 {
Result::Err(e) => Result::Err(e),
Result::Ok(hs2) => {
let hs1 = concat_disposables(hs1.into_iter());
let hs2 = concat_disposables(hs2.into_iter());
Result::Ok(hs1.merge(hs2).into_boxed())
}
}
}
}
}
fn connect_cancel(parent: Grc<ProcessId>, cancellation: ProcessCancellation, child: Grc<ProcessId>, p: &Point) -> simulation::Result<DisposableBox> {
let h1 = match cancellation {
ProcessCancellation::CancelTogether |
ProcessCancellation::CancelChildAfterParent => {
let child = Grc::downgrade(&child);
parent.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
match child.upgrade() {
None => Result::Ok(()),
Some(child) => child.initiate_cancel_at(p)
}
}))
.call_event(p)
},
ProcessCancellation::CancelParentAfterChild |
ProcessCancellation::CancelInIsolation => {
Result::Ok(empty_disposable().into_boxed())
}
};
match h1 {
Result::Err(e) => Result::Err(e),
Result::Ok(h1) => {
let h2 = match cancellation {
ProcessCancellation::CancelTogether |
ProcessCancellation::CancelParentAfterChild => {
let parent = Grc::downgrade(&parent);
child.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
match parent.upgrade() {
None => Result::Ok(()),
Some(parent) => parent.initiate_cancel_at(p)
}
}))
.call_event(p)
},
ProcessCancellation::CancelChildAfterParent |
ProcessCancellation::CancelInIsolation => {
Result::Ok(empty_disposable().into_boxed())
}
};
match h2 {
Result::Err(e) => {
h1.call_box((p,))?;
Result::Err(e)
},
Result::Ok(h2) => Result::Ok(h1.merge(h2).into_boxed())
}
}
}
}
#[doc(hidden)]
pub fn initiate_cancel_at(&self, p: &Point) -> simulation::Result<()> {
let f = self.cancel_initiated.read_at(p);
if !f {
self.cancel_initiated.write_at(true, p);
self.cancel_activated.write_at(true, p);
self.source.trigger_at(&ProcessEvent::CancelInitiating, p)
} else {
Result::Ok(())
}
}
#[doc(hidden)]
pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let f = self.cancel_initiated.read_at(p);
if !f {
let n = self.preemption_count.read_at(p);
self.preemption_count.write_at(n + 1, p);
if n == 0 {
self.source.trigger_at(&ProcessEvent::PreemptionInitiating, p)
} else {
Result::Ok(())
}
} else {
Result::Ok(())
}
}
#[doc(hidden)]
pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let f = self.cancel_initiated.read_at(p);
if !f {
let n = self.preemption_count.read_at(p);
self.preemption_count.write_at(n - 1, p);
if n - 1 == 0 {
self.source.trigger_at(&ProcessEvent::PreemptionEnding, p)
} else {
Result::Ok(())
}
} else {
Result::Ok(())
}
}
#[inline]
pub fn begin_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| pid.begin_preemption_at(p))
}
#[inline]
pub fn end_preemption(pid: Grc<ProcessId>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| pid.end_preemption_at(p))
}
#[inline]
pub fn preemption_initiating(&self) -> impl Observable<Message = ProcessEvent> + Clone {
self.observable().filter(|m| { *m == ProcessEvent::PreemptionInitiating })
}
#[inline]
pub fn preemption_ending(&self) -> impl Observable<Message = ProcessEvent> + Clone {
self.observable().filter(|m| { *m == ProcessEvent::PreemptionEnding })
}
#[inline]
fn is_preempted(&self, p: &Point) -> bool {
self.preemption_count.read_at(p) > 0
}
#[inline]
pub fn interrupt(pid: Grc<ProcessId>) -> Interrupt {
Interrupt { pid: pid }
}
#[inline]
pub fn is_interrupted(pid: Grc<ProcessId>) -> IsInterrupted {
IsInterrupted { pid: pid }
}
#[inline]
pub fn interruption_time(pid: Grc<ProcessId>) -> InterruptionTime {
InterruptionTime { pid: pid }
}
#[inline]
pub fn is_passivated(pid: Grc<ProcessId>) -> IsPassivated {
IsPassivated { pid: pid }
}
#[inline]
pub fn reactivate(pid: Grc<ProcessId>) -> Reactivate {
Reactivate { pid: pid }
}
#[inline]
pub fn reactivate_immediately(pid: Grc<ProcessId>) -> ReactivateImmediately {
ReactivateImmediately { pid: pid }
}
#[inline]
pub fn reactivate_many_immediately<I>(pids: I) -> ReactivateManyImmediately<I::IntoIter>
where I: IntoIterator<Item = Grc<ProcessId>> + 'static
{
ReactivateManyImmediately { pids: pids.into_iter() }
}
fn on_preempted_at(pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match pid.interrupt_cont.swap_at(None, p) {
Some(cont) => {
let t = pid.interrupt_time.read_at(p);
let dt = t - p.time;
let v = pid.interrupt_ver.read_at(p);
let priority = pid.interrupt_priority.read_at(p);
pid.interrupt_flag.write_at(Some(true), p);
pid.interrupt_ver.write_at(v.wrapping_add(1), p);
let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
restore_process_priority(priority, hold_process(dt))
.call_process_boxed(cont, pid, p)
});
reenter_process(cont, pid, (), p)
},
None => {
match pid.react_cont.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
let cont = substitute_process_boxed(cont, move |cont, pid, (), p| {
reenter_process(cont, pid, (), p)
});
pid.react_cont.write_at(Some(cont), p);
Result::Ok(())
}
}
}
}
}
}
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
self.started == other.started
}
}
impl Eq for ProcessId {}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct NewProcessId {}
impl Simulation for NewProcessId {
type Item = ProcessId;
#[doc(hidden)]
fn call_simulation(self, r: &SimulationRun) -> simulation::Result<Self::Item> {
let specs = &r.specs;
Result::Ok(ProcessId {
cancel_initiated: RefComp::new(false),
cancel_activated: RefComp::new(false),
preemption_count: RefComp::new(0),
source: ObservableSource::new(),
started: RefComp::new(false),
react_flag: RefComp::new(None),
react_cont: RefComp::new(None),
react_priority: RefComp::new(0),
interrupt_flag: RefComp::new(None),
interrupt_cont: RefComp::new(None),
interrupt_time: RefComp::new(specs.start_time),
interrupt_ver: RefComp::new(0),
interrupt_priority: RefComp::new(0)
})
}
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub enum ProcessEvent {
CancelInitiating,
PreemptionInitiating,
PreemptionEnding
}
pub trait Process {
type Item;
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static;
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>;
#[inline]
fn and_then<U, F>(self, f: F) -> AndThen<Self, U, F>
where Self: Sized + 'static,
U: Process + 'static,
F: FnOnce(Self::Item) -> U + 'static,
{
AndThen { comp: self, f: f, _phantom: PhantomData }
}
#[inline]
fn map<B, F>(self, f: F) -> Map<Self, B, F>
where Self: Sized + 'static,
F: FnOnce(Self::Item) -> B + 'static,
{
Map { comp: self, f: f, _phantom: PhantomData }
}
#[inline]
fn zip<U>(self, other: U) -> Zip<Self, U>
where Self: Sized + 'static,
U: Process + 'static
{
Zip { comp: self, other: other }
}
#[inline]
fn ap<U, B>(self, other: U) -> Ap<Self, U, B>
where Self: Sized + 'static,
Self::Item: FnOnce(U::Item) -> B,
U: Process + 'static,
B: 'static
{
Ap { comp: self, other: other, _phantom: PhantomData }
}
#[inline]
fn finally<U>(self, finalization: U) -> Finally<Self, U>
where Self: Sized + 'static,
U: Process<Item = ()> + 'static
{
Finally { comp: self, finalization: finalization }
}
#[inline]
fn run(self) -> Run<Self>
where Self: Process<Item = ()> + Sized
{
Run { comp: self, pid: None }
}
#[inline]
fn run_using_id(self, pid: Grc<ProcessId>) -> Run<Self>
where Self: Process<Item = ()> + Sized
{
Run { comp: self, pid: Some(pid) }
}
#[inline]
fn into_boxed(self) -> ProcessBox<Self::Item>
where Self: Sized + 'static
{
ProcessBox::new(move |cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point| {
self.call_process_boxed(cont, pid, p)
})
}
}
pub trait IntoProcess {
type Process: Process<Item = Self::Item>;
type Item;
fn into_process(self) -> Self::Process;
}
impl<M: Process> IntoProcess for M {
type Process = M;
type Item = M::Item;
#[inline]
fn into_process(self) -> Self::Process {
self
}
}
#[doc(hidden)]
pub struct ProcessBoxCont<T> {
f: Box<dyn ProcessContFnBox<T>>
}
#[doc(hidden)]
impl<T> ProcessBoxCont<T> {
#[inline]
pub fn new<F>(f: F) -> Self
where F: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
ProcessBoxCont { f: Box::new(f) }
}
#[inline]
pub fn call_box(self, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
self.f.call_box(args)
}
}
trait ProcessContFnBox<T> {
fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
}
impl<T, F> ProcessContFnBox<T> for F
where F: for<'a> FnOnce(simulation::Result<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
{
fn call_box(self: Box<Self>, args: (simulation::Result<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
let this: Self = *self;
this(args.0, args.1, args.2)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
pub struct ProcessBox<T> {
f: Box<dyn ProcessFnBox<T>>
}
#[doc(hidden)]
impl<T> ProcessBox<T> {
#[inline]
pub fn new<F>(f: F) -> Self
where F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
ProcessBox { f: Box::new(f) }
}
#[inline]
pub fn call_box(self, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
self.f.call_box(args)
}
}
impl<T> Process for ProcessBox<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_box((cont, pid, p))
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
self.call_box((cont, pid, p))
}
#[inline]
fn into_boxed(self) -> ProcessBox<Self::Item>
where Self: Sized + 'static
{
self
}
}
trait ProcessFnBox<T> {
fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()>;
}
impl<T, F> ProcessFnBox<T> for F
where F: for<'a> FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, &'a Point) -> simulation::Result<()>
{
fn call_box(self: Box<Self>, args: (ProcessBoxCont<T>, Grc<ProcessId>, &Point)) -> simulation::Result<()> {
let this: Self = *self;
this(args.0, args.1, args.2)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Return<T> {
val: T
}
impl<T> Process for Return<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Return { val } = self;
cont(Result::Ok(val), pid, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Return { val } = self;
cont.call_box((Result::Ok(val), pid, p))
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Delay<F, M> {
f: F,
_phantom: PhantomData<M>
}
impl<F, M> Process for Delay<F, M>
where F: FnOnce() -> M + 'static,
M: Process + 'static
{
type Item = M::Item;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Delay { f, _phantom } = self;
return_process(()).and_then(move |_| {
f()
}).call_process(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Delay { f, _phantom } = self;
return_process(()).and_then(move |_| {
f()
}).call_process_boxed(cont, pid, p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct AndThen<M, U, F> {
comp: M,
f: F,
_phantom: PhantomData<U>
}
impl<M, U, F> Process for AndThen<M, U, F>
where F: FnOnce(M::Item) -> U + 'static,
M: Process + 'static,
U: Process + 'static
{
type Item = U::Item;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let AndThen { comp, f, _phantom } = self;
let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
match a {
Result::Ok(a) => {
f(a).call_process(cont, pid, p)
},
Result::Err(Error::Cancel) => {
revoke_process(cont, pid, p)
},
Result::Err(Error::Other(e)) => {
match e.deref() {
&OtherError::Retry(_) => {
cut_error_process(cont, pid, e, p)
},
&OtherError::Panic(_) => {
cut_error_process(cont, pid, e, p)
},
&OtherError::IO(_) => {
propagate_error_process(cont, pid, e, p)
}
}
}
}
};
comp.call_process(cont, pid, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let AndThen { comp, f, _phantom } = self;
let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
match a {
Result::Ok(a) => {
f(a).call_process_boxed(cont, pid, p)
},
Result::Err(Error::Cancel) => {
revoke_process_boxed(cont, pid, p)
},
Result::Err(Error::Other(e)) => {
match e.deref() {
&OtherError::Retry(_) => {
cut_error_process_boxed(cont, pid, e, p)
},
&OtherError::Panic(_) => {
cut_error_process_boxed(cont, pid, e, p)
},
&OtherError::IO(_) => {
propagate_error_process_boxed(cont, pid, e, p)
}
}
}
}
};
comp.call_process(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Finally<M, U> {
comp: M,
finalization: U
}
impl<M, U> Process for Finally<M, U>
where M: Process + 'static,
U: Process<Item = ()> + 'static
{
type Item = M::Item;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Finally { comp, finalization } = self;
let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
match a0 {
Result::Ok(()) => cont(a, pid, p),
Result::Err(e0) => {
match a {
Result::Ok(_) => cont(Result::Err(e0), pid, p),
Result::Err(e) => cont(Result::Err(e.merge(&e0)), pid, p)
}
}
}
};
finalization.call_process(cont, pid, p)
};
comp.call_process(cont, pid, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Finally { comp, finalization } = self;
let cont = move |a: simulation::Result<M::Item>, pid: Grc<ProcessId>, p: &Point| {
let cont = move |a0: simulation::Result<()>, pid: Grc<ProcessId>, p: &Point| {
match a0 {
Result::Ok(()) => cont.call_box((a, pid, p)),
Result::Err(e0) => {
match a {
Result::Ok(_) => cont.call_box((Result::Err(e0), pid, p)),
Result::Err(e) => cont.call_box((Result::Err(e.merge(&e0)), pid, p))
}
}
}
};
finalization.call_process(cont, pid, p)
};
comp.call_process(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Map<M, B, F> {
comp: M,
f: F,
_phantom: PhantomData<B>
}
impl<M, B, F> Process for Map<M, B, F>
where F: FnOnce(M::Item) -> B + 'static,
M: Process + 'static,
B: 'static
{
type Item = B;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Map { comp, f, _phantom } = self;
comp.and_then(move |a| {
return_process(f(a))
}).call_process(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Map { comp, f, _phantom } = self;
comp.and_then(move |a| {
return_process(f(a))
}).call_process_boxed(cont, pid, p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Zip<M, U> {
comp: M,
other: U,
}
impl<M, U> Process for Zip<M, U>
where M: Process + 'static,
U: Process + 'static
{
type Item = (M::Item, U::Item);
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Zip { comp, other } = self;
comp.and_then(move |a| {
other.and_then(move |b| {
return_process((a, b))
})
}).call_process(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Zip { comp, other } = self;
comp.and_then(move |a| {
other.and_then(move |b| {
return_process((a, b))
})
}).call_process_boxed(cont, pid, p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Ap<M, U, B> {
comp: M,
other: U,
_phantom: PhantomData<B>
}
impl<M, U, B> Process for Ap<M, U, B>
where M: Process + 'static,
U: Process + 'static,
M::Item: FnOnce(U::Item) -> B,
B: 'static
{
type Item = B;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Ap { comp, other, _phantom } = self;
comp.and_then(move |f| {
other.and_then(move |a| {
return_process(f(a))
})
}).call_process(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Ap { comp, other, _phantom } = self;
comp.and_then(move |f| {
other.and_then(move |a| {
return_process(f(a))
})
}).call_process_boxed(cont, pid, p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
pub struct Run<M> {
comp: M,
pid: Option<Grc<ProcessId>>
}
impl<M> Event for Run<M>
where M: Process<Item = ()>
{
type Item = M::Item;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<M::Item> {
let Run { comp, pid } = self;
let pid = {
match pid {
Some(pid) => pid,
None => {
match ProcessId::new().into_event().call_event(p) {
Result::Ok(pid) => Grc::new(pid),
Result::Err(e) => return Result::Err(e)
}
}
}
};
let cont = &initial_cont;
match ProcessId::prepare(pid.clone(), p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => comp.call_process(cont, pid, p)
}
}
}
fn initial_cont(_a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
Result::Ok(())
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Id {}
impl Process for Id {
type Item = Grc<ProcessId>;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
cont(Result::Ok(pid.clone()), pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
cont.call_box((Result::Ok(pid.clone()), pid, p))
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct CancelById {
pid: Grc<ProcessId>
}
impl Event for CancelById {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<()> {
let CancelById { pid } = self;
pid.initiate_cancel_at(p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Cancel<T> {
_phantom: PhantomData<T>
}
impl<T> Process for Cancel<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
match pid.initiate_cancel_at(p) {
Result::Ok(()) => {
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
Result::Ok(())
}
},
Result::Err(e) => Result::Err(e)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match pid.initiate_cancel_at(p) {
Result::Ok(()) => {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
Result::Ok(())
}
},
Result::Err(e) => Result::Err(e)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Hold {
dt: f64
}
impl Process for Hold {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Hold { dt } = self;
let t = p.time + dt;
pid.interrupt_cont.write_at(Some(cont), p);
pid.interrupt_flag.write_at(Some(false), p);
pid.interrupt_time.write_at(t, p);
pid.interrupt_priority.write_at(p.priority, p);
let v = pid.interrupt_ver.read_at(p);
enqueue_event(t, {
cons_event(move |p| {
if p.priority >= p.minimal_priority {
let v2 = pid.interrupt_ver.read_at(p);
if v == v2 {
pid.interrupt_flag.write_at(None, p);
let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
resume_process_boxed(cont, pid, (), p)
} else {
Result::Ok(())
}
} else {
pid.initiate_cancel_at(p)?;
if is_process_cancelled(&pid, p) {
let v2 = pid.interrupt_ver.read_at(p);
if v == v2 {
pid.interrupt_flag.write_at(None, p);
let cont = pid.interrupt_cont.swap_at(None, p).unwrap();
revoke_process_boxed(cont, pid, p)
} else {
Result::Ok(())
}
} else {
Result::Ok(())
}
}
}).into_boxed()
}).call_event(p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Interrupt {
pid: Grc<ProcessId>
}
impl Event for Interrupt {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let Interrupt { pid } = self;
match pid.interrupt_cont.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
let v = pid.interrupt_ver.read_at(p);
let priority = pid.interrupt_priority.read_at(p);
pid.interrupt_ver.write_at(v.wrapping_add(1), p);
pid.interrupt_flag.write_at(Some(true), p);
enqueue_event_with_priority(p.time, priority, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct IsInterrupted {
pid: Grc<ProcessId>
}
impl Event for IsInterrupted {
type Item = bool;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let IsInterrupted { pid } = self;
match pid.interrupt_flag.read_at(p) {
Some(true) => Result::Ok(true),
_ => Result::Ok(false)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct InterruptionTime {
pid: Grc<ProcessId>
}
impl Event for InterruptionTime {
type Item = Option<f64>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let InterruptionTime { pid } = self;
match pid.interrupt_flag.read_at(p) {
Some(false) => Result::Ok(Some(pid.interrupt_time.read_at(p))),
_ => Result::Ok(None)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Passivate {}
impl Process for Passivate {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match pid.react_flag.read_at(p) {
Some(true) => {
let msg = String::from("Cannot passivate the process twice");
let err = OtherError::retry(msg);
cut_error_process_boxed(cont, pid, err, p)
},
_ => {
match pid.react_cont.swap_at(Some(cont), p) {
Some(_) => panic!("The reactivation continuation has diverged"),
None => {
pid.react_priority.write_at(p.priority, p);
pid.react_flag.write_at(Some(true), p);
Result::Ok(())
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct PassivateBefore<M> {
comp: M
}
impl<M> Process for PassivateBefore<M>
where M: Event<Item = ()>
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match pid.react_flag.read_at(p) {
Some(true) => {
let msg = String::from("Cannot passivate the process twice");
let err = OtherError::retry(msg);
cut_error_process_boxed(cont, pid, err, p)
},
_ => {
match pid.react_cont.swap_at(Some(cont), p) {
Some(_) => panic!("The reactivation continuation has diverged"),
None => {
let PassivateBefore { comp } = self;
pid.react_priority.write_at(p.priority, p);
pid.react_flag.write_at(Some(true), p);
comp.call_event(p)
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct IsPassivated {
pid: Grc<ProcessId>
}
impl Event for IsPassivated {
type Item = bool;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let IsPassivated { pid } = self;
match pid.react_flag.read_at(p) {
Some(true) => Result::Ok(true),
_ => Result::Ok(false)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Reactivate {
pid: Grc<ProcessId>
}
impl Event for Reactivate {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let Reactivate { pid } = self;
match pid.react_cont.swap_at(None, p) {
Some(cont) => {
match pid.react_flag.swap_at(None, p) {
Some(true) => {
let priority = pid.react_priority.read_at(p);
enqueue_event_with_priority(p.time, priority, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
},
_ => {
panic!("The reactivation continuation has diverged")
}
}
},
None => {
Result::Ok(())
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct ReactivateImmediately {
pid: Grc<ProcessId>
}
impl Event for ReactivateImmediately {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let ReactivateImmediately { pid } = self;
match pid.react_cont.swap_at(None, p) {
Some(cont) => {
match pid.react_flag.swap_at(None, p) {
Some(true) => {
let priority = pid.react_priority.read_at(p);
if p.priority == priority {
resume_process_boxed(cont, pid, (), p)
} else {
enqueue_event_with_priority(p.time, priority, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
},
_ => {
panic!("The reactivation continuation has diverged")
}
}
},
None => {
Result::Ok(())
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct ReactivateManyImmediately<I> {
pids: I
}
impl<I> Event for ReactivateManyImmediately<I>
where I: Iterator<Item = Grc<ProcessId>> + 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let ReactivateManyImmediately { mut pids } = self;
match pids.next() {
None => Result::Ok(()),
Some(pid) => {
match ProcessId::reactivate_immediately(pid).call_event(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
let comp = ReactivateManyImmediately { pids: pids };
yield_event(comp).call_event(p)
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Loop<F, M> {
f: F,
_phantom: PhantomData<M>
}
impl<F, M> Process for Loop<F, M>
where F: Fn() -> M + 'static,
M: Process<Item = ()> + 'static
{
type Item = M::Item;
#[doc(hidden)]
#[inline(never)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Loop { f, _phantom } = self;
let comp = f();
comp.and_then(move |()| {
Loop { f: f, _phantom: PhantomData }
}).call_process(cont, pid, p)
}
#[doc(hidden)]
#[inline(never)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Loop { f, _phantom } = self;
let comp = f();
comp.and_then(move |()| {
Loop { f: f, _phantom: PhantomData }
}).call_process_boxed(cont, pid, p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Sequence<I, M> where M: Process {
comps: I,
acc: Vec<M::Item>
}
impl<I, M> Process for Sequence<I, M>
where I: Iterator<Item = M> + 'static,
M: Process + 'static
{
type Item = Vec<M::Item>;
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Sequence { mut comps, mut acc } = self;
match comps.next() {
None => cont(Result::Ok(acc), pid, p),
Some(comp) => {
comp.and_then(move |a| {
acc.push(a);
Sequence { comps: comps, acc: acc }
}).call_process(cont, pid, p)
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Sequence { mut comps, mut acc } = self;
match comps.next() {
None => cont.call_box((Result::Ok(acc), pid, p)),
Some(comp) => {
comp.and_then(move |a| {
acc.push(a);
Sequence { comps: comps, acc: acc }
}).call_process_boxed(cont, pid, p)
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Sequence_<I, M>
{
comps: I,
_phantom: PhantomData<M>
}
impl<I, M> Process for Sequence_<I, M>
where I: Iterator<Item = M> + 'static,
M: Process + 'static
{
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Sequence_ { mut comps, _phantom } = self;
match comps.next() {
None => cont(Result::Ok(()), pid, p),
Some(comp) => {
comp.and_then(move |_| {
Sequence_ { comps: comps, _phantom: _phantom }
}).call_process(cont, pid, p)
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Sequence_ { mut comps, _phantom } = self;
match comps.next() {
None => cont.call_box((Result::Ok(()), pid, p)),
Some(comp) => {
comp.and_then(move |_| {
Sequence_ { comps: comps, _phantom: _phantom }
}).call_process_boxed(cont, pid, p)
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct SpawnUsingId<M>
{
cancellation: ProcessCancellation,
comp: M,
comp_id: Grc<ProcessId>
}
impl<M> Process for SpawnUsingId<M>
where M: Process<Item = ()> + 'static,
{
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let SpawnUsingId { cancellation, comp, comp_id } = self;
match ProcessId::prepare(comp_id.clone(), p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
match hs {
Result::Err(e) => Result::Err(e),
Result::Ok(hs) => {
enqueue_event(p.time, {
cons_event(move |p| {
let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
hs.dispose(p)?;
match a {
Result::Ok(()) => Result::Ok(()),
Result::Err(Error::Cancel) => Result::Ok(()),
Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
}
};
comp.call_process(cont, comp_id, p)
}).into_boxed()
}).and_then(move |()| {
cons_event(move |p| {
resume_process(cont, pid, (), p)
})
}).call_event(p)
}
}
}
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let SpawnUsingId { cancellation, comp, comp_id } = self;
match ProcessId::prepare(comp_id.clone(), p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
let hs = ProcessId::connect_cancel(pid.clone(), cancellation, comp_id.clone(), p);
match hs {
Result::Err(e) => Result::Err(e),
Result::Ok(hs) => {
enqueue_event(p.time, {
cons_event(move |p| {
let cont = move |a: simulation::Result<()>, _pid: Grc<ProcessId>, p: &Point| {
hs.dispose(p)?;
match a {
Result::Ok(()) => Result::Ok(()),
Result::Err(Error::Cancel) => Result::Ok(()),
Result::Err(Error::Other(e)) => Result::Err(Error::Other(e))
}
};
comp.call_process(cont, comp_id, p)
}).into_boxed()
}).and_then(move |()| {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
})
}).call_event(p)
}
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Spawn<M>
{
cancellation: ProcessCancellation,
comp: M
}
impl<M> Process for Spawn<M>
where M: Process<Item = ()> + 'static
{
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Spawn { cancellation, comp } = self;
ProcessId::new()
.into_process()
.and_then(move |pid| {
spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
})
.call_process(cont, pid, p)
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Spawn { cancellation, comp } = self;
ProcessId::new()
.into_process()
.and_then(move |pid| {
spawn_process_using_id_with(cancellation, Grc::new(pid), comp)
})
.call_process_boxed(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct TimeoutUsingId<M>
{
timeout: f64,
comp: M,
comp_id: Grc<ProcessId>
}
impl<M> Process for TimeoutUsingId<M>
where M: Process + 'static,
M::Item: Clone
{
type Item = Option<M::Item>;
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let TimeoutUsingId { timeout, comp, comp_id } = self;
let comp_id_clone = comp_id.clone();
let s = Grc::new(ObservableSource::new());
let s_clone = s.clone();
ProcessId::new()
.into_process()
.and_then(move |timeout_id| {
let timeout_id = Grc::new(timeout_id);
spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
hold_process(timeout)
.and_then(move |()| {
ProcessId::cancel(comp_id_clone)
.into_process()
})
})
.and_then(move |()| {
let r = Grc::new(RefComp::new(None));
spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
let r_clone = r.clone();
comp.and_then(move |item| {
RefComp::write(r_clone, Some(item))
.into_process()
})
.finally({
ProcessId::cancel(timeout_id)
.and_then(move |()| {
RefComp::read(r)
.and_then(move |item| {
s_clone.trigger(item)
})
})
.into_process()
})
})
})
})
.and_then(move |()| {
process_await(s.publish())
})
.call_process(cont, pid, p)
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let TimeoutUsingId { timeout, comp, comp_id } = self;
let comp_id_clone = comp_id.clone();
let s = Grc::new(ObservableSource::new());
let s_clone = s.clone();
ProcessId::new()
.into_process()
.and_then(move |timeout_id| {
let timeout_id = Grc::new(timeout_id);
spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, timeout_id.clone(), {
hold_process(timeout)
.and_then(move |()| {
ProcessId::cancel(comp_id_clone)
.into_process()
})
})
.and_then(move |()| {
let r = Grc::new(RefComp::new(None));
spawn_process_using_id_with(ProcessCancellation::CancelChildAfterParent, comp_id, {
let r_clone = r.clone();
comp.and_then(move |item| {
RefComp::write(r_clone, Some(item))
.into_process()
})
.finally({
ProcessId::cancel(timeout_id)
.and_then(move |()| {
RefComp::read(r)
.and_then(move |item| {
s_clone.trigger(item)
})
})
.into_process()
})
})
})
})
.and_then(move |()| {
process_await(s.publish())
})
.call_process_boxed(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Timeout<M>
{
timeout: f64,
comp: M
}
impl<M> Process for Timeout<M>
where M: Process + 'static,
M::Item: Clone
{
type Item = Option<M::Item>;
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Timeout { timeout, comp } = self;
ProcessId::new()
.into_process()
.and_then(move |comp_id| {
timeout_process_using_id(timeout, Grc::new(comp_id), comp)
})
.call_process(cont, pid, p)
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Timeout { timeout, comp } = self;
ProcessId::new()
.into_process()
.and_then(move |comp_id| {
timeout_process_using_id(timeout, Grc::new(comp_id), comp)
})
.call_process_boxed(cont, pid, p)
}
}
}
#[doc(hidden)]
pub struct FrozenProcess<T> {
comp: EventBox<Option<ProcessBoxCont<T>>>
}
impl<T> FrozenProcess<T>
where T: 'static
{
#[doc(hidden)]
#[inline]
pub fn unfreeze(self, p: &Point) -> simulation::Result<Option<ProcessBoxCont<T>>> {
let FrozenProcess { comp } = self;
comp.call_box((p,))
}
#[doc(hidden)]
pub fn new(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<FrozenProcess<T>> {
let cont = substitute_process_priority_boxed(p.priority, cont);
let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rc = Grc::new(RefComp::new(Some(cont)));
let h = {
let rh = rh.clone();
let rc = rc.clone();
let weak_pid = Grc::downgrade(&pid);
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
let h = rh.swap_at(None, p).unwrap();
h.dispose(p)?;
match rc.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
let pid = pid.clone();
enqueue_event(p.time, {
cons_event(move |p| {
let z = is_process_cancelled(&pid, p);
if z {
revoke_process_boxed(cont, pid, p)
} else {
Result::Ok(())
}
}).into_boxed()
}).call_event(p)
}
}
}))
.call_event(p)
};
match h {
Result::Err(e) => Result::Err(e),
Result::Ok(h) => {
rh.write_at(Some(h), p);
let comp = {
cons_event(move |p| {
let h = rh.swap_at(None, p).unwrap();
h.dispose(p)?;
let cont = rc.swap_at(None, p);
Result::Ok(cont)
}).into_boxed()
};
Result::Ok(FrozenProcess { comp: comp })
}
}
}
#[doc(hidden)]
pub fn with_reentering<M>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, comp: M, p: &Point) -> simulation::Result<FrozenProcess<T>>
where M: Process<Item = T> + 'static
{
let cont = substitute_process_priority_boxed(p.priority, cont);
let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rc = Grc::new(RefComp::new(Some(cont)));
let h = {
let rh = rh.clone();
let rc = rc.clone();
let weak_pid = Grc::downgrade(&pid);
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
let pid = weak_pid.upgrade().expect("The process identifier cannot be removed");
let h = rh.swap_at(None, p).unwrap();
h.dispose(p)?;
match rc.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
let pid = pid.clone();
enqueue_event(p.time, {
cons_event(move |p| {
let z = is_process_cancelled(&pid, p);
if z {
revoke_process_boxed(cont, pid, p)
} else {
Result::Ok(())
}
}).into_boxed()
}).call_event(p)
}
}
}))
.call_event(p)
};
match h {
Result::Err(e) => Result::Err(e),
Result::Ok(h) => {
rh.write_at(Some(h), p);
let comp = {
cons_event(move |p| {
let h = rh.swap_at(None, p).unwrap();
h.dispose(p)?;
match rc.swap_at(None, p) {
None => Result::Ok(None),
Some(cont) => {
let f = pid.is_preempted(p);
if !f {
Result::Ok(Some(cont))
} else {
let cont = ProcessBoxCont::new(move |a, pid, p| {
match a {
Result::Ok(_) => comp.call_process_boxed(cont, pid, p),
Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
}
});
match sleep_process(cont, pid, val, p) {
Result::Ok(()) => Result::Ok(None),
Result::Err(e) => Result::Err(e)
}
}
}
}
}).into_boxed()
};
Result::Ok(FrozenProcess { comp: comp })
}
}
}
}
#[doc(hidden)]
pub fn reenter_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
where T: 'static
{
let f = pid.is_preempted(p);
if !f {
enqueue_event(p.time, {
cons_event(move |p| {
let f = pid.is_preempted(p);
if !f {
resume_process_boxed(cont, pid, val, p)
} else {
sleep_process(cont, pid, val, p)
}
}).into_boxed()
}).call_event(p)
} else {
sleep_process(cont, pid, val, p)
}
}
#[doc(hidden)]
pub fn sleep_process<T>(cont: ProcessBoxCont<T>, pid: Grc<ProcessId>, val: T, p: &Point) -> simulation::Result<()>
where T: 'static
{
let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rc = Grc::new(RefComp::new(Some(cont)));
let rv = Grc::new(RefComp::new(Some(val)));
let h = {
let rh = rh.clone();
pid.observable()
.subscribe(cons_observer(move |e, p| {
let h = rh.swap_at(None, p).unwrap();
h.dispose(p)?;
match e {
&ProcessEvent::CancelInitiating => {
let pid = pid.clone();
let rc = rc.clone();
enqueue_event(p.time, {
cons_event(move |p| {
let z = is_process_cancelled(&pid, p);
if z {
let cont = rc.swap_at(None, p).unwrap();
revoke_process_boxed(cont, pid, p)
} else {
Result::Ok(())
}
}).into_boxed()
}).call_event(p)
},
&ProcessEvent::PreemptionEnding => {
let pid = pid.clone();
let rc = rc.clone();
let rv = rv.clone();
enqueue_event(p.time, {
cons_event(move |p| {
let cont = rc.swap_at(None, p).unwrap();
let val = rv.swap_at(None, p).unwrap();
reenter_process(cont, pid, val, p)
}).into_boxed()
}).call_event(p)
},
&ProcessEvent::PreemptionInitiating => {
panic!("The computation was already preempted")
}
}
}))
.call_event(p)
};
match h {
Result::Err(e) => Result::Err(e),
Result::Ok(h) => {
rh.write_at(Some(h), p);
Result::Ok(())
}
}
}
#[doc(hidden)]
pub fn substitute_process<C, T, F>(cont: C, f: F) -> ProcessBoxCont<T>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
T: 'static,
F: FnOnce(C, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static,
{
ProcessBoxCont::new(move |a, pid, p| {
match a {
Result::Ok(a) => f(cont, pid, a, p),
Result::Err(e) => cont(Result::Err(e), pid, p)
}
})
}
#[doc(hidden)]
pub fn substitute_process_boxed<T, F>(cont: ProcessBoxCont<T>, f: F) -> ProcessBoxCont<T>
where T: 'static,
F: FnOnce(ProcessBoxCont<T>, Grc<ProcessId>, T, &Point) -> simulation::Result<()> + 'static
{
ProcessBoxCont::new(move |a, pid, p| {
match a {
Result::Ok(a) => f(cont, pid, a, p),
Result::Err(e) => cont.call_box((Result::Err(e), pid, p))
}
})
}
#[doc(hidden)]
pub fn substitute_process_priority<C, T>(priority: isize, cont: C) -> ProcessBoxCont<T>
where C: FnOnce(simulation::Result<T>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static,
T: 'static
{
ProcessBoxCont::new(move |a, pid, p| {
match a {
Result::Ok(a) => {
if p.priority == priority {
cont(Result::Ok(a), pid, p)
} else {
enqueue_event_with_priority(p.time, priority, {
cons_event(move |p| {
resume_process(cont, pid, a, p)
}).into_boxed()
}).call_event(p)
}
},
Result::Err(e) => {
cont(Result::Err(e), pid, p)
}
}
})
}
#[doc(hidden)]
pub fn substitute_process_priority_boxed<T>(priority: isize, cont: ProcessBoxCont<T>) -> ProcessBoxCont<T>
where T: 'static
{
ProcessBoxCont::new(move |a, pid, p| {
match a {
Result::Ok(a) => {
if p.priority == priority {
cont.call_box((Result::Ok(a), pid, p))
} else {
enqueue_event_with_priority(p.time, priority, {
cons_event(move |p| {
resume_process_boxed(cont, pid, a, p)
}).into_boxed()
}).call_event(p)
}
},
Result::Err(e) => {
cont.call_box((Result::Err(e), pid, p))
}
}
})
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Await<O, M> {
observable: O,
_phantom: PhantomData<M>
}
impl<O, M> Process for Await<O, M>
where O: Observable<Message = M>,
M: Clone + 'static
{
type Item = M;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match FrozenProcess::new(cont, pid.clone(), p) {
Result::Err(e) => Result::Err(e),
Result::Ok(cont) => {
let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rc = RefComp::new(Some(cont));
let h = {
let rh = rh.clone();
let rh2 = rh2.clone();
let pid = pid.clone();
let Await { observable, _phantom } = self;
observable
.subscribe(cons_observer(move |x: &M, p| {
if let Some(h) = rh.swap_at(None, p) {
h.dispose(p)?;
}
if let Some(h2) = rh2.swap_at(None, p) {
h2.dispose(p)?;
}
let cont = rc.swap_at(None, p).unwrap();
match cont.unfreeze(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(None) => Result::Ok(()),
Result::Ok(Some(cont)) => {
let pid = pid.clone();
reenter_process(cont, pid, x.clone(), p)
}
}
}))
.call_event(p)
};
match h {
Result::Err(e) => return Result::Err(e),
Result::Ok(h) => {
let h2 = {
let rh = rh.clone();
let rh2 = rh2.clone();
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
if let Some(h) = rh.swap_at(None, p) {
h.dispose(p)?;
}
if let Some(h2) = rh2.swap_at(None, p) {
h2.dispose(p)?;
}
Result::Ok(())
}))
.call_event(p)
};
match h2 {
Result::Err(e) => {
h.dispose(p)?;
return Result::Err(e)
},
Result::Ok(h2) => {
rh.write_at(Some(h), p);
rh2.write_at(Some(h2), p);
Result::Ok(())
}
}
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct AwaitResult<O, M> {
observable: O,
_phantom: PhantomData<M>
}
impl<O, M> Process for AwaitResult<O, M>
where O: Observable<Message = simulation::Result<M>>,
M: Clone + 'static
{
type Item = M;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
self.call_process_boxed(ProcessBoxCont::new(cont), pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
match FrozenProcess::new(cont, pid.clone(), p) {
Result::Err(e) => Result::Err(e),
Result::Ok(cont) => {
let rh: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rh2: Grc<RefComp<Option<DisposableBox>>> = Grc::new(RefComp::new(None));
let rc = RefComp::new(Some(cont));
let h = {
let rh = rh.clone();
let rh2 = rh2.clone();
let pid = pid.clone();
let AwaitResult { observable, _phantom } = self;
observable
.subscribe(cons_observer(move |x: &simulation::Result<M>, p| {
if let Some(h) = rh.swap_at(None, p) {
h.dispose(p)?;
}
if let Some(h2) = rh2.swap_at(None, p) {
h2.dispose(p)?;
}
let cont = rc.swap_at(None, p).unwrap();
match cont.unfreeze(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(None) => Result::Ok(()),
Result::Ok(Some(cont)) => {
let pid = pid.clone();
match x {
Result::Ok(x) => {
reenter_process(cont, pid, x.clone(), p)
},
Result::Err(Error::Cancel) => {
revoke_process_boxed(cont, pid, p)
},
Result::Err(Error::Other(e)) => {
match e.deref() {
&OtherError::Retry(_) => {
cut_error_process_boxed(cont, pid, e.clone(), p)
},
&OtherError::Panic(_) => {
cut_error_process_boxed(cont, pid, e.clone(), p)
},
&OtherError::IO(_) => {
propagate_error_process_boxed(cont, pid, e.clone(), p)
}
}
}
}
}
}
}))
.call_event(p)
};
match h {
Result::Err(e) => return Result::Err(e),
Result::Ok(h) => {
let h2 = {
let rh = rh.clone();
let rh2 = rh2.clone();
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
if let Some(h) = rh.swap_at(None, p) {
h.dispose(p)?;
}
if let Some(h2) = rh2.swap_at(None, p) {
h2.dispose(p)?;
}
Result::Ok(())
}))
.call_event(p)
};
match h2 {
Result::Err(e) => {
h.dispose(p)?;
return Result::Err(e)
},
Result::Ok(h2) => {
rh.write_at(Some(h), p);
rh2.write_at(Some(h2), p);
Result::Ok(())
}
}
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Transfer<T, M> {
comp: M,
_phantom: PhantomData<T>
}
impl<T, M> Process for Transfer<T, M>
where M: Process<Item = ()>
{
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Transfer { comp, _phantom } = self;
fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
a
}
comp.call_process(cont, pid, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Transfer { comp, _phantom } = self;
fn cont(a: simulation::Result<()>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
a
}
comp.call_process(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Never<T> {
_phantom: PhantomData<T>
}
impl<T> Process for Never<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, _cont: C, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
Result::Ok(())
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, _cont: ProcessBoxCont<Self::Item>, _pid: Grc<ProcessId>, _p: &Point) -> simulation::Result<()> {
Result::Ok(())
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct WhenCancelling<M> {
comp: M
}
impl<M> Process for WhenCancelling<M>
where M: Observer<Message = (), Item = ()> + 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let WhenCancelling { comp } = self;
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
comp.call_observer(&(), p)
}))
.call_event(p)?;
resume_process(cont, pid, (), p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let WhenCancelling { comp } = self;
pid.cancel_initiating()
.subscribe(cons_observer(move |_, p| {
comp.call_observer(&(), p)
}))
.call_event(p)?;
resume_process_boxed(cont, pid, (), p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Panic<T> {
msg: String,
_phantom: PhantomData<T>
}
impl<T> Process for Panic<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Panic { msg, _phantom } = self;
let err = Rc::new(OtherError::Panic(msg));
cut_error_process(cont, pid, err, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Panic { msg, _phantom } = self;
let err = Rc::new(OtherError::Panic(msg));
cut_error_process_boxed(cont, pid, err, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Trace<M> {
comp: M,
msg: String
}
impl<M> Process for Trace<M>
where M: Process
{
type Item = M::Item;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let Trace { comp, msg } = self;
p.trace(&msg);
comp.call_process(cont, pid, p)
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let Trace { comp, msg } = self;
p.trace(&msg);
comp.call_process_boxed(cont, pid, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct ProcessWithPriority {
priority: isize
}
impl Process for ProcessWithPriority {
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let ProcessWithPriority { priority } = self;
if priority == p.priority {
cont(Result::Ok(()), pid, p)
} else {
let comp = cons_event(move |p| {
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
cont(Result::Ok(()), pid, p)
}
});
enqueue_event_with_priority(p.time, priority, comp.into_boxed())
.call_event(p)
}
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let ProcessWithPriority { priority } = self;
if priority == p.priority {
cont.call_box((Result::Ok(()), pid, p))
} else {
let comp = cons_event(move |p| {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
cont.call_box((Result::Ok(()), pid, p))
}
});
enqueue_event_with_priority(p.time, priority, comp.into_boxed())
.call_event(p)
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct EmbedResult<T> {
val: simulation::Result<T>
}
impl<T> Process for EmbedResult<T> {
type Item = T;
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let EmbedResult { val } = self;
match val {
Result::Ok(a) => {
cont(Result::Ok(a), pid, p)
},
Result::Err(Error::Cancel) => {
let comp = cancel_process();
comp.call_process(cont, pid, p)
},
Result::Err(Error::Other(e)) => {
match e.deref() {
&OtherError::Retry(_) => {
cut_error_process(cont, pid, e, p)
},
&OtherError::Panic(_) => {
cut_error_process(cont, pid, e, p)
},
&OtherError::IO(_) => {
propagate_error_process(cont, pid, e, p)
}
}
}
}
}
}
#[doc(hidden)]
#[inline]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let EmbedResult { val } = self;
match val {
Result::Ok(a) => {
cont.call_box((Result::Ok(a), pid, p))
},
Result::Err(Error::Cancel) => {
let comp = cancel_process();
comp.call_process_boxed(cont, pid, p)
},
Result::Err(Error::Other(e)) => {
match e.deref() {
&OtherError::Retry(_) => {
cut_error_process_boxed(cont, pid, e, p)
},
&OtherError::Panic(_) => {
cut_error_process_boxed(cont, pid, e, p)
},
&OtherError::IO(_) => {
propagate_error_process_boxed(cont, pid, e, p)
}
}
}
}
}
}
}