use crate::simulation;
use crate::simulation::error::*;
use crate::simulation::Point;
use crate::simulation::ref_comp::RefComp;
use crate::simulation::observable::*;
use crate::simulation::observable::source::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::strategy::*;
use dvcompute_utils::simulation::stats::*;
use dvcompute_utils::grc::Grc;
#[inline]
pub fn request_resource<S>(resource: Grc<Resource<S>>) -> Request<S>
where S: QueueStrategy + 'static
{
Request { resource: resource }
}
#[inline]
pub fn request_resource_with_priority<S>(resource: Grc<Resource<S>>, priority: S::Priority) -> RequestWithPriority<S>
where S: QueueStrategy + 'static,
S::Priority: Clone
{
RequestWithPriority { resource: resource, priority: priority }
}
#[inline]
pub fn release_resource<S>(resource: Grc<Resource<S>>) -> Release<S>
where S: QueueStrategy
{
Release { resource: resource }
}
#[inline]
pub fn release_resource_within_event<S>(resource: Grc<Resource<S>>) -> ReleaseWithinEvent<S>
where S: QueueStrategy
{
ReleaseWithinEvent { resource: resource }
}
#[inline]
pub fn try_request_resource_within_event<S>(resource: Grc<Resource<S>>) -> TryRequestWithinEvent<S>
where S: QueueStrategy
{
TryRequestWithinEvent { resource: resource }
}
#[inline]
pub fn new_fcfs_resource(count: isize) -> NewResource<FCFSStrategy> {
NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: Some(count) }
}
#[inline]
pub fn new_fcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<FCFSStrategy> {
NewResource { strategy: FCFSStrategy::Instance, count: count, max_count: max_count }
}
#[inline]
pub fn new_lcfs_resource(count: isize) -> NewResource<LCFSStrategy> {
NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: Some(count) }
}
#[inline]
pub fn new_lcfs_resource_with_max_count(count: isize, max_count: Option<isize>) -> NewResource<LCFSStrategy> {
NewResource { strategy: LCFSStrategy::Instance, count: count, max_count: max_count }
}
pub type FCFSResource = Resource<FCFSStrategy>;
pub type LCFSResource = Resource<LCFSStrategy>;
pub struct Resource<S> where S: QueueStrategy {
max_count: Option<isize>,
count: RefComp<isize>,
count_stats: RefComp<TimingStats<isize>>,
count_source: ObservableSource<isize>,
util_count: RefComp<isize>,
util_count_stats: RefComp<TimingStats<isize>>,
util_count_source: ObservableSource<isize>,
queue_count: RefComp<isize>,
queue_count_stats: RefComp<TimingStats<isize>>,
queue_count_source: ObservableSource<isize>,
total_wait_time: RefComp<f64>,
wait_time: RefComp<SamplingStats<f64>>,
wait_time_source: ObservableSource<SamplingStats<f64>>,
wait_list: QueueStorageBox<ResourceItem, S::Priority>
}
impl<S> PartialEq for Resource<S> where S: QueueStrategy {
fn eq(&self, other: &Self) -> bool {
self.count == other.count
}
}
impl<S> Eq for Resource<S> where S: QueueStrategy {}
struct ResourceItem {
time: f64,
pid: Grc<ProcessId>,
cont: FrozenProcess<()>
}
impl<S> Resource<S> where S: QueueStrategy {
#[inline]
pub fn new(strategy: S, count: isize) -> NewResource<S> {
NewResource { strategy: strategy, count: count, max_count: Some(count) }
}
#[inline]
pub fn new_with_max_count(strategy: S, count: isize, max_count: Option<isize>) -> NewResource<S> {
NewResource { strategy: strategy, count: count, max_count: max_count }
}
#[inline]
pub fn count(resource: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(resource.count.read_at(p))
})
}
#[inline]
pub fn count_stats(resource: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(resource.count_stats.read_at(p))
})
}
#[inline]
pub fn count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.count_source.publish()
}
#[inline]
pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed().map(move |_| { () })
}
#[inline]
pub fn util_count(resource: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(resource.util_count.read_at(p))
})
}
#[inline]
pub fn util_count_stats(resource: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(resource.util_count_stats.read_at(p))
})
}
#[inline]
pub fn util_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.util_count_source.publish()
}
#[inline]
pub fn util_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.util_count_changed().map(move |_| { () })
}
#[inline]
pub fn queue_count(resource: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(resource.queue_count.read_at(p))
})
}
#[inline]
pub fn queue_count_stats(resource: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(resource.queue_count_stats.read_at(p))
})
}
#[inline]
pub fn queue_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.queue_count_source.publish()
}
#[inline]
pub fn queue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.queue_count_changed().map(move |_| { () })
}
#[inline]
pub fn total_wait_time(resource: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok(resource.total_wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time(resource: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(resource.wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time_changed(&self) -> impl Observable<Message = SamplingStats<f64>> + Clone {
self.wait_time_source.publish()
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.wait_time_changed().map(move |_| { () })
}
#[inline]
fn update_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.count.read_at(p);
let a2 = a + delta;
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.count.write_at(a2, p);
self.count_stats.write_at(stats2, p);
self.count_source.trigger_at(&a2, p)
}
#[inline]
fn update_queue_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.queue_count.read_at(p);
let a2 = a + delta;
let stats = self.queue_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.queue_count.write_at(a2, p);
self.queue_count_stats.write_at(stats2, p);
self.queue_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_util_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.util_count.read_at(p);
let a2 = a + delta;
let stats = self.util_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.util_count.write_at(a2, p);
self.util_count_stats.write_at(stats2, p);
self.util_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_wait_time(&self, delta: f64, p: &Point) -> simulation::Result<()> {
let a = self.total_wait_time.read_at(p);
let a2 = a + delta;
let stats = self.wait_time.read_at(p);
let stats2 = stats.add(delta);
self.total_wait_time.write_at(a2, p);
self.wait_time.write_at(stats2, p);
self.wait_time_source.trigger_at(&stats2, p)
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
.merge(self.util_count_changed_())
.merge(self.queue_count_changed_())
}
pub fn reset(resource: Grc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let t = p.time;
let count = resource.count.read_at(p);
let util_count = resource.util_count.read_at(p);
let queue_count = resource.queue_count.read_at(p);
resource.count_stats.write_at(TimingStats::from_sample(t, count), p);
resource.util_count_stats.write_at(TimingStats::from_sample(t, util_count), p);
resource.queue_count_stats.write_at(TimingStats::from_sample(t, queue_count), p);
resource.total_wait_time.write_at(0.0, p);
resource.wait_time.write_at(SamplingStats::empty(), p);
resource.wait_time_source.trigger_at(&SamplingStats::empty(), p)
})
}
}
#[derive(Clone)]
pub struct NewResource<S> {
strategy: S,
count: isize,
max_count: Option<isize>
}
impl<S> Event for NewResource<S> where S: QueueStrategy {
type Item = Resource<S>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let NewResource { strategy, count, max_count } = self;
if count < 0 {
let msg = String::from("The resource count cannot be actually negative");
let err = Error::retry(msg);
Result::Err(err)
} else if count > max_count.unwrap_or(count) {
let msg = String::from("The resource count cannot be greater than its upper bound");
let err = Error::retry(msg);
Result::Err(err)
} else {
let t = p.time;
let wait_list = strategy.new_storage();
Result::Ok(Resource {
max_count: max_count,
count: RefComp::new(count),
count_stats: RefComp::new(TimingStats::from_sample(t, count)),
count_source: ObservableSource::new(),
util_count: RefComp::new(0),
util_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
util_count_source: ObservableSource::new(),
queue_count: RefComp::new(0),
queue_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
queue_count_source: ObservableSource::new(),
total_wait_time: RefComp::new(0.0),
wait_time: RefComp::new(SamplingStats::empty()),
wait_time_source: ObservableSource::new(),
wait_list: wait_list
})
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Request<S>
where S: QueueStrategy + 'static
{
resource: Grc<Resource<S>>
}
impl<S> Process for Request<S>
where S: QueueStrategy + 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Request { resource } = self;
if resource.count.read_at(p) > 0 {
resource.update_wait_time(0.0, p)?;
resource.update_count(-1, p)?;
resource.update_util_count(1, p)?;
resume_process(cont, pid, (), p)
} else {
let t = p.time;
let comp = Request { resource: resource.clone() };
let cont = ProcessBoxCont::new(cont);
let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
let item = ResourceItem {
time: t,
pid: pid,
cont: cont
};
resource.wait_list.push(item, p);
resource.update_queue_count(1, p)
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Request { resource } = self;
if resource.count.read_at(p) > 0 {
resource.update_wait_time(0.0, p)?;
resource.update_count(-1, p)?;
resource.update_util_count(1, p)?;
resume_process_boxed(cont, pid, (), p)
} else {
let t = p.time;
let comp = Request { resource: resource.clone() };
let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
let item = ResourceItem {
time: t,
pid: pid,
cont: cont
};
resource.wait_list.push(item, p);
resource.update_queue_count(1, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct RequestWithPriority<S>
where S: QueueStrategy + 'static
{
resource: Grc<Resource<S>>,
priority: S::Priority
}
impl<S> Process for RequestWithPriority<S>
where S: QueueStrategy + 'static,
S::Priority: Clone
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let RequestWithPriority { resource, priority } = self;
if resource.count.read_at(p) > 0 {
resource.update_wait_time(0.0, p)?;
resource.update_count(-1, p)?;
resource.update_util_count(1, p)?;
resume_process(cont, pid, (), p)
} else {
let t = p.time;
let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
let cont = ProcessBoxCont::new(cont);
let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
let item = ResourceItem {
time: t,
pid: pid,
cont: cont
};
resource.wait_list.push_with_priority(priority, item, p);
resource.update_queue_count(1, p)
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let RequestWithPriority { resource, priority } = self;
if resource.count.read_at(p) > 0 {
resource.update_wait_time(0.0, p)?;
resource.update_count(-1, p)?;
resource.update_util_count(1, p)?;
resume_process_boxed(cont, pid, (), p)
} else {
let t = p.time;
let comp = RequestWithPriority { resource: resource.clone(), priority: priority.clone() };
let cont = FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p)?;
let item = ResourceItem {
time: t,
pid: pid,
cont: cont
};
resource.wait_list.push_with_priority(priority, item, p);
resource.update_queue_count(1, p)
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Release<S> where S: QueueStrategy + 'static {
resource: Grc<Resource<S>>
}
impl<S> Process for Release<S>
where S: QueueStrategy
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Release { resource } = self;
let comp = ReleaseWithinEvent { resource: resource };
comp.call_event(p)?;
resume_process(cont, pid, (), p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Release { resource } = self;
let comp = ReleaseWithinEvent { resource: resource };
comp.call_event(p)?;
resume_process_boxed(cont, pid, (), p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct ReleaseWithinEvent<S> where S: QueueStrategy + 'static {
resource: Grc<Resource<S>>
}
impl<S> Event for ReleaseWithinEvent<S>
where S: QueueStrategy
{
type Item = ();
#[doc(hidden)]
fn call_event(self, p: &Point) -> simulation::Result<()> {
let ReleaseWithinEvent { resource } = self;
let a = resource.count.read_at(p);
let a2 = a + 1;
if a2 > resource.max_count.unwrap_or(a2) {
let msg = String::from("The resource count cannot be greater than its upper bound");
let err = Error::retry(msg);
Result::Err(err)
} else {
let t = p.time;
loop {
if resource.wait_list.is_empty(p) {
resource.update_count(1, p)?;
break;
} else {
let ResourceItem { time: t0, pid, cont: cont0 } = {
resource.wait_list.pop(p).unwrap()
};
resource.update_queue_count(-1, p)?;
match cont0.unfreeze(p)? {
None => continue,
Some(cont) => {
resource.update_wait_time(t - t0, p)?;
resource.update_util_count(1, p)?;
enqueue_event(t, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)?;
break;
}
}
}
}
Result::Ok(())
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct TryRequestWithinEvent<S>
where S: QueueStrategy + 'static
{
resource: Grc<Resource<S>>
}
impl<S> Event for TryRequestWithinEvent<S>
where S: QueueStrategy
{
type Item = bool;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let TryRequestWithinEvent { resource } = self;
if resource.count.read_at(p) > 0 {
resource.update_wait_time(0.0, p)?;
resource.update_count(-1, p)?;
resource.update_util_count(1, p)?;
Result::Ok(true)
} else {
Result::Ok(false)
}
}
}