use std::rc::Rc;
use std::marker::PhantomData;
use dvcompute::simulation;
use dvcompute::simulation::error::*;
use dvcompute::simulation::Point;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::source::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::process::*;
use dvcompute::simulation::strategy::QueueStorage;
use dvcompute_utils::simulation::stats::*;
use crate::simulation::transact::*;
use crate::simulation::strategy::*;
use crate::simulation::block::*;
use crate::simulation::block::basic::*;
pub struct Facility<T> {
count: RefComp<isize>,
count_stats: RefComp<TimingStats<isize>>,
count_source: ObservableSource<isize>,
capture_count: RefComp<isize>,
capture_count_source: ObservableSource<isize>,
util_count: RefComp<isize>,
util_count_stats: RefComp<TimingStats<isize>>,
util_count_source: ObservableSource<isize>,
queue_count: RefComp<isize>,
queue_count_stats: RefComp<TimingStats<isize>>,
queue_count_source: ObservableSource<isize>,
total_wait_time: RefComp<f64>,
wait_time: RefComp<SamplingStats<f64>>,
wait_time_source: ObservableSource<SamplingStats<f64>>,
total_holding_time: RefComp<f64>,
holding_time: RefComp<SamplingStats<f64>>,
holding_time_source: ObservableSource<SamplingStats<f64>>,
owner: RefComp<Option<Rc<FacilityOwnerItem<T>>>>,
delay_chain: FCFSStorage<FacilityDelayedItem<T>>,
interrupt_chain: LCFSStorage<FacilityInterruptedItem<T>>,
pending_chain: FCFSStorage<FacilityPendingItem<T>>
}
impl<T> PartialEq for Facility<T> {
fn eq(&self, other: &Self) -> bool {
self.count == other.count
}
}
impl<T> Eq for Facility<T> {}
struct FacilityOwnerItem<T> {
transact: Rc<Transact<T>>,
init_priority: isize,
time: f64,
preempting: bool,
interrupting: bool,
acc_holding_time: f64
}
struct FacilityDelayedItem<T> {
transact: Rc<Transact<T>>,
init_priority: isize,
time: f64,
preempting: bool,
interrupting: bool,
cont: FrozenProcess<()>
}
struct FacilityInterruptedItem<T> {
transact: Rc<Transact<T>>,
init_priority: isize,
time: f64,
preempting: bool,
interrupting: bool,
remaining_time: Option<f64>,
transfer: Option<FacilityPreemptTransfer<T>>,
acc_holding_time: f64
}
struct FacilityPendingItem<T> {
transact: Rc<Transact<T>>,
init_priority: isize,
time: f64,
preempting: bool,
interrupting: bool,
cont: FrozenProcess<()>
}
#[derive(Clone)]
pub struct FacilityPreemptMode<T> {
pub priority_mode: bool,
pub transfer: Option<FacilityPreemptTransfer<T>>,
pub remove_mode: bool
}
impl<T> From<PreemptBlockMode<T>> for FacilityPreemptMode<T>
where T: 'static
{
fn from(mode: PreemptBlockMode<T>) -> Self {
let PreemptBlockMode { priority_mode, transfer, remove_mode } = mode;
let transfer = match transfer {
None => None,
Some(f) => Some({
FacilityPreemptTransfer::new(move |transact, dt| {
f.call_box(dt).run(transact).into_boxed()
})
})
};
FacilityPreemptMode {
priority_mode: priority_mode,
transfer: transfer,
remove_mode: remove_mode
}
}
}
pub struct FacilityPreemptTransfer<T> {
f: Box<dyn FacilityPreemptTransferFnBoxClone<T>>
}
impl<T> FacilityPreemptTransfer<T> {
#[inline]
pub fn new<F>(f: F) -> Self
where F: FnOnce(Rc<Transact<T>>, Option<f64>) -> ProcessBox<()> + Clone + 'static
{
FacilityPreemptTransfer {
f: Box::new(f)
}
}
#[inline]
pub fn call_box(self, arg: (Rc<Transact<T>>, Option<f64>)) -> ProcessBox<()> {
let FacilityPreemptTransfer { f } = self;
f.call_box(arg)
}
}
impl<T> Clone for FacilityPreemptTransfer<T> {
#[inline]
fn clone(&self) -> Self {
FacilityPreemptTransfer {
f: self.f.call_clone()
}
}
}
trait FacilityPreemptTransferFnBox<T> {
fn call_box(self: Box<Self>, args: (Rc<Transact<T>>, Option<f64>)) -> ProcessBox<()>;
}
impl<T, F> FacilityPreemptTransferFnBox<T> for F
where F: FnOnce(Rc<Transact<T>>, Option<f64>) -> ProcessBox<()>
{
fn call_box(self: Box<Self>, args: (Rc<Transact<T>>, Option<f64>)) -> ProcessBox<()> {
let this: Self = *self;
this(args.0, args.1)
}
}
trait FacilityPreemptTransferFnBoxClone<T>: FacilityPreemptTransferFnBox<T> {
fn call_clone(&self) -> Box<dyn FacilityPreemptTransferFnBoxClone<T>>;
}
impl<T, F> FacilityPreemptTransferFnBoxClone<T> for F
where F: FnOnce(Rc<Transact<T>>, Option<f64>) -> ProcessBox<()> + Clone + 'static
{
fn call_clone(&self) -> Box<dyn FacilityPreemptTransferFnBoxClone<T>> {
Box::new(self.clone())
}
}
impl<T> Facility<T> {
#[inline]
pub fn new() -> NewFacility<T> {
NewFacility { _phantom: PhantomData }
}
#[inline]
pub fn count(facility: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(facility.count.read_at(p))
})
}
#[inline]
pub fn count_stats(facility: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(facility.count_stats.read_at(p))
})
}
#[inline]
pub fn count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.count_source.publish()
}
#[inline]
pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed().map(move |_| { () })
}
#[inline]
pub fn capture_count(facility: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(facility.capture_count.read_at(p))
})
}
#[inline]
pub fn capture_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.capture_count_source.publish()
}
#[inline]
pub fn capture_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.capture_count_changed().map(move |_| { () })
}
#[inline]
pub fn util_count(facility: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(facility.util_count.read_at(p))
})
}
#[inline]
pub fn util_count_stats(facility: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(facility.util_count_stats.read_at(p))
})
}
#[inline]
pub fn util_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.util_count_source.publish()
}
#[inline]
pub fn util_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.util_count_changed().map(move |_| { () })
}
#[inline]
pub fn queue_count(facility: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(facility.queue_count.read_at(p))
})
}
#[inline]
pub fn queue_count_stats(facility: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(facility.queue_count_stats.read_at(p))
})
}
#[inline]
pub fn queue_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.queue_count_source.publish()
}
#[inline]
pub fn queue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.queue_count_changed().map(move |_| { () })
}
#[inline]
pub fn total_wait_time(facility: Rc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok(facility.total_wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time(facility: Rc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(facility.wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time_changed(&self) -> impl Observable<Message = SamplingStats<f64>> + Clone {
self.wait_time_source.publish()
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.wait_time_changed().map(move |_| { () })
}
#[inline]
pub fn total_holding_time(facility: Rc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok(facility.total_holding_time.read_at(p))
})
}
#[inline]
pub fn holding_time(facility: Rc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(facility.holding_time.read_at(p))
})
}
#[inline]
pub fn holding_time_changed(&self) -> impl Observable<Message = SamplingStats<f64>> + Clone {
self.holding_time_source.publish()
}
#[inline]
pub fn holding_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.holding_time_changed().map(move |_| { () })
}
#[inline]
pub fn is_interrupted(facility: Rc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
match facility.owner.read_at(p) {
None => Result::Ok(false),
Some(owner) => Result::Ok(owner.preempting)
}
})
}
#[inline]
fn update_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.count.read_at(p);
let a2 = a + delta;
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.count.write_at(a2, p);
self.count_stats.write_at(stats2, p);
self.count_source.trigger_at(&a2, p)
}
#[inline]
fn update_capture_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.capture_count.read_at(p);
let a2 = a + delta;
self.capture_count.write_at(a2, p);
self.capture_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_queue_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.queue_count.read_at(p);
let a2 = a + delta;
let stats = self.queue_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.queue_count.write_at(a2, p);
self.queue_count_stats.write_at(stats2, p);
self.queue_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_util_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.util_count.read_at(p);
let a2 = a + delta;
let stats = self.util_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.util_count.write_at(a2, p);
self.util_count_stats.write_at(stats2, p);
self.util_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_wait_time(&self, delta: f64, p: &Point) -> simulation::Result<()> {
let a = self.total_wait_time.read_at(p);
let a2 = a + delta;
let stats = self.wait_time.read_at(p);
let stats2 = stats.add(delta);
self.total_wait_time.write_at(a2, p);
self.wait_time.write_at(stats2, p);
self.wait_time_source.trigger_at(&stats2, p)
}
#[inline]
fn update_holding_time(&self, delta: f64, p: &Point) -> simulation::Result<()> {
let a = self.total_holding_time.read_at(p);
let a2 = a + delta;
let stats = self.holding_time.read_at(p);
let stats2 = stats.add(delta);
self.total_holding_time.write_at(a2, p);
self.holding_time.write_at(stats2, p);
self.holding_time_source.trigger_at(&stats2, p)
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
.merge(self.capture_count_changed_())
.merge(self.util_count_changed_())
.merge(self.queue_count_changed_())
}
#[inline]
pub fn reset(facility: Rc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let t = p.time;
let count = facility.count.read_at(p);
let util_count = facility.util_count.read_at(p);
let queue_count = facility.queue_count.read_at(p);
facility.count_stats.write_at(TimingStats::from_sample(t, count), p);
facility.capture_count.write_at(0, p);
facility.util_count_stats.write_at(TimingStats::from_sample(t, util_count), p);
facility.queue_count_stats.write_at(TimingStats::from_sample(t, queue_count), p);
facility.total_wait_time.write_at(0.0, p);
facility.wait_time.write_at(SamplingStats::empty(), p);
facility.total_holding_time.write_at(0.0, p);
facility.holding_time.write_at(SamplingStats::empty(), p);
facility.count_source.trigger_at(&count, p)?;
facility.capture_count_source.trigger_at(&0, p)?;
facility.util_count_source.trigger_at(&util_count, p)?;
facility.queue_count_source.trigger_at(&queue_count, p)?;
facility.wait_time_source.trigger_at(&SamplingStats::empty(), p)?;
facility.holding_time_source.trigger_at(&SamplingStats::empty(), p)
})
}
}
#[derive(Clone)]
pub struct NewFacility<T> {
_phantom: PhantomData<T>
}
impl<T> Event for NewFacility<T> {
type Item = Facility<T>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let t = p.time;
Result::Ok(Facility {
count: RefComp::new(1),
count_stats: RefComp::new(TimingStats::from_sample(t, 1)),
count_source: ObservableSource::new(),
capture_count: RefComp::new(0),
capture_count_source: ObservableSource::new(),
util_count: RefComp::new(0),
util_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
util_count_source: ObservableSource::new(),
queue_count: RefComp::new(0),
queue_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
queue_count_source: ObservableSource::new(),
total_wait_time: RefComp::new(0.0),
wait_time: RefComp::new(SamplingStats::empty()),
wait_time_source: ObservableSource::new(),
total_holding_time: RefComp::new(0.0),
holding_time: RefComp::new(SamplingStats::empty()),
holding_time_source: ObservableSource::new(),
owner: RefComp::new(None),
delay_chain: FCFSStorage::new(),
interrupt_chain: LCFSStorage::new(),
pending_chain: FCFSStorage::new()
})
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Seize<T> {
facility: Rc<Facility<T>>,
transact: Rc<Transact<T>>
}
impl<T> Process for Seize<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Seize { facility, transact } = self;
let t = p.time;
let f = {
facility.delay_chain.is_empty(p)
&& facility.interrupt_chain.is_empty(p)
&& facility.pending_chain.is_empty(p)
};
if f {
seize_free_facility_boxed(facility, transact, cont, pid, p)
} else {
let comp = seize_facility(facility.clone(), transact.clone());
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let init_priority = Transact::priority_at(&transact, p);
let item = FacilityDelayedItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: false,
interrupting: false,
cont: cont
};
facility.delay_chain.push_with_priority(init_priority, item, p);
facility.update_queue_count(1, p)
}
}
}
#[inline]
pub fn seize_facility<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>) -> Seize<T> {
Seize { facility: facility, transact: transact }
}
fn seize_free_facility_boxed<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>,
cont: ProcessBoxCont<()>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where T: 'static
{
let t = p.time;
let init_priority = Transact::priority_at(&transact, p);
match facility.owner.read_at(p) {
None => {
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: false,
interrupting: false,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
facility.update_wait_time(0.0, p)?;
facility.update_count(-1, p)?;
facility.update_capture_count(1, p)?;
facility.update_util_count(1, p)?;
resume_process_boxed(cont, pid, (), p)
},
Some(_owner) => {
let comp = seize_facility(facility.clone(), transact.clone());
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let item = FacilityDelayedItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: false,
interrupting: false,
cont: cont
};
facility.delay_chain.push_with_priority(init_priority, item, p);
facility.update_queue_count(1, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Preempt<T> {
facility: Rc<Facility<T>>,
transact: Rc<Transact<T>>,
mode: FacilityPreemptMode<T>
}
impl<T> Process for Preempt<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Preempt { facility, transact, mode } = self;
let t = p.time;
let init_priority = Transact::priority_at(&transact, p);
match facility.owner.read_at(p) {
None => {
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: true,
interrupting: false,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
facility.update_wait_time(0.0, p)?;
facility.update_count(-1, p)?;
facility.update_capture_count(1, p)?;
facility.update_util_count(1, p)?;
resume_process_boxed(cont, pid, (), p)
},
Some(ref owner) if !mode.priority_mode && owner.interrupting => {
let comp = preempt_facility(facility.clone(), transact.clone(), mode);
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let item = FacilityPendingItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: true,
interrupting: true,
cont: cont
};
facility.pending_chain.push_with_priority(init_priority, item, p);
facility.update_queue_count(1, p)
},
Some(ref owner) if mode.priority_mode && init_priority <= owner.init_priority => {
let comp = preempt_facility(facility.clone(), transact.clone(), mode);
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let item = FacilityDelayedItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: true,
interrupting: true,
cont: cont
};
facility.delay_chain.push_with_priority(init_priority, item, p);
facility.update_queue_count(1, p)
},
Some(ref owner) if !mode.remove_mode => {
let owner = owner.clone();
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: true,
interrupting: true,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
let pid0 = owner.transact.require_process_id(p)?;
let t2 = ProcessId::interruption_time(pid0).call_event(p)?;
let dt0 = match t2 {
None => None,
Some(t2) => Some(t2 - t)
};
let priority0 = owner.init_priority;
let item = FacilityInterruptedItem {
transact: owner.transact.clone(),
init_priority: priority0,
time: t,
preempting: owner.preempting,
interrupting: owner.interrupting,
remaining_time: dt0,
transfer: mode.transfer,
acc_holding_time: owner.acc_holding_time + (t - owner.time)
};
facility.interrupt_chain.push_with_priority(priority0, item, p);
facility.update_queue_count(1, p)?;
facility.update_wait_time(0.0, p)?;
facility.update_capture_count(1, p)?;
Transact::begin_preemption_at(&owner.transact, p)?;
resume_process_boxed(cont, pid, (), p)
},
Some(ref owner) => {
let owner = owner.clone();
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: true,
interrupting: true,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
let pid0 = owner.transact.require_process_id(p)?;
let t2 = ProcessId::interruption_time(pid0).call_event(p)?;
let dt0 = match t2 {
None => None,
Some(t2) => Some(t2 - t)
};
facility.update_wait_time(0.0, p)?;
facility.update_capture_count(1, p)?;
facility.update_holding_time(owner.acc_holding_time + (t - owner.time), p)?;
match mode.transfer {
None => {
let msg = String::from("The destination is not specified for the removed preempted transact");
let err = Error::retry(msg);
Result::Err(err)
},
Some(transfer) => {
let comp = transfer.call_box((owner.transact.clone(), dt0));
Transact::transfer(owner.transact.clone(), comp, p)?;
resume_process_boxed(cont, pid, (), p)
}
}
}
}
}
}
#[inline]
pub fn preempt_facility<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>,
mode: FacilityPreemptMode<T>) -> Preempt<T>
{
Preempt { facility: facility, transact: transact, mode: mode }
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Release<T> {
facility: Rc<Facility<T>>,
transact: Rc<Transact<T>>,
preempting: Option<bool>
}
impl<T> Process for Release<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Release { facility, transact, preempting } = self;
let t = p.time;
match facility.owner.read_at(p) {
None => {
let msg = String::from("There is no owner of the facility");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref owner) if owner.transact == transact && owner.preempting != preempting.unwrap_or(owner.preempting) => {
let msg = String::from("Mismatch use of returning and releasing the facility");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref owner) if owner.transact == transact => {
facility.owner.write_at(None, p);
facility.update_util_count(-1, p)?;
facility.update_holding_time(owner.acc_holding_time + (t - owner.time), p)?;
facility.update_count(1, p)?;
enqueue_event(t, try_capture_facility(facility).into_boxed()).call_event(p)?;
resume_process_boxed(cont, pid, (), p)
},
Some(_) => {
let msg = String::from("The facility has another owner");
let err = Error::retry(msg);
Result::Err(err)
}
}
}
}
type Return<T> = Release<T>;
#[inline]
pub fn return_facility<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>) -> Return<T> {
Return { facility: facility, transact: transact, preempting: Some(true) }
}
#[inline]
pub fn release_facility<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>) -> Release<T> {
Release { facility: facility, transact: transact, preempting: Some(false) }
}
#[inline]
pub fn release_or_return_facility<T>(facility: Rc<Facility<T>>, transact: Rc<Transact<T>>) -> Release<T> {
Release { facility: facility, transact: transact, preempting: None }
}
#[inline]
fn try_capture_facility<T>(facility: Rc<Facility<T>>) -> impl Event<Item = ()> + Clone
where T: 'static
{
cons_event(move |p| {
match facility.owner.read_at(p) {
None => capture_facility(facility, p),
Some(_) => Result::Ok(())
}
})
}
fn capture_facility<T>(facility: Rc<Facility<T>>, p: &Point) -> simulation::Result<()>
where T: 'static
{
let t = p.time;
if !facility.pending_chain.is_empty(p) {
let FacilityPendingItem { transact, init_priority, time: t0, preempting, interrupting, cont: cont0 } = {
facility.pending_chain.pop(p).unwrap()
};
facility.update_queue_count(-1, p)?;
match cont0.unfreeze(p)? {
None => capture_facility(facility, p),
Some(cont) => {
let pid = transact.require_process_id(p)?;
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: preempting,
interrupting: interrupting,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
facility.update_wait_time(t - t0, p)?;
facility.update_util_count(1, p)?;
facility.update_capture_count(1, p)?;
facility.update_count(-1, p)?;
enqueue_event(t, {
cons_event(move |p| {
reenter_process(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
} else if !facility.interrupt_chain.is_empty(p) {
let FacilityInterruptedItem { transact, init_priority, time: t0, preempting, interrupting,
remaining_time: dt0, transfer: transfer0, acc_holding_time: acc0 } = {
facility.interrupt_chain.pop(p).unwrap()
};
facility.update_queue_count(-1, p)?;
let pid = transact.require_process_id(p)?;
if pid.is_cancel_initiated_at(p) {
capture_facility(facility, p)
} else {
let item = FacilityOwnerItem {
transact: transact.clone(),
init_priority: init_priority,
time: t,
preempting: preempting,
interrupting: interrupting,
acc_holding_time: acc0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
facility.update_wait_time(t - t0, p)?;
facility.update_util_count(1, p)?;
facility.update_count(-1, p)?;
match transfer0 {
None => Result::Ok(()),
Some(transfer) => {
let comp = transfer.call_box((transact.clone(), dt0));
Transact::transfer(transact.clone(), comp, p)
}
}?;
Transact::end_preemption_at(&transact, p)
}
} else if !facility.delay_chain.is_empty(p) {
let FacilityDelayedItem { transact, init_priority, time: t0, preempting, interrupting, cont: cont0 } = {
facility.delay_chain.pop(p).unwrap()
};
facility.update_queue_count(-1, p)?;
match cont0.unfreeze(p)? {
None => capture_facility(facility, p),
Some(cont) => {
let pid = transact.require_process_id(p)?;
let item = FacilityOwnerItem {
transact: transact,
init_priority: init_priority,
time: t,
preempting: preempting,
interrupting: interrupting,
acc_holding_time: 0.0
};
let item = Rc::new(item);
facility.owner.write_at(Some(item), p);
facility.update_wait_time(t - t0, p)?;
facility.update_util_count(1, p)?;
facility.update_capture_count(1, p)?;
facility.update_count(-1, p)?;
enqueue_event(t, {
cons_event(move |p| {
reenter_process(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
} else {
Result::Ok(())
}
}