use std::rc::Rc;
use std::marker::PhantomData;
use dvcompute::simulation;
use dvcompute::simulation::error::*;
use dvcompute::simulation::Point;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::source::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::process::*;
use dvcompute::simulation::strategy::QueueStorage;
use dvcompute_utils::simulation::stats::*;
use crate::simulation::transact::*;
use crate::simulation::strategy::*;
pub struct Storage<T> {
capacity: isize,
content: RefComp<isize>,
content_stats: RefComp<TimingStats<isize>>,
content_source: ObservableSource<isize>,
use_count: RefComp<isize>,
use_count_source: ObservableSource<isize>,
used_content: RefComp<isize>,
used_content_source: ObservableSource<isize>,
util_count: RefComp<isize>,
util_count_stats: RefComp<TimingStats<isize>>,
util_count_source: ObservableSource<isize>,
queue_count: RefComp<isize>,
queue_count_stats: RefComp<TimingStats<isize>>,
queue_count_source: ObservableSource<isize>,
total_wait_time: RefComp<f64>,
wait_time: RefComp<SamplingStats<f64>>,
wait_time_source: ObservableSource<SamplingStats<f64>>,
delay_chain: FCFSStorage<StorageDelayedItem<T>>
}
impl<T> PartialEq for Storage<T> {
fn eq(&self, other: &Self) -> bool {
self.content == other.content
}
}
impl<T> Eq for Storage<T> {}
struct StorageDelayedItem<T> {
transact: Rc<Transact<T>>,
time: f64,
decrement: isize,
cont: FrozenProcess<()>
}
impl<T> Storage<T> {
#[inline]
pub fn new(capacity: isize) -> NewStorage<T> {
NewStorage { capacity: capacity, _phantom: PhantomData }
}
pub fn capacity(&self) -> isize {
self.capacity
}
#[inline]
pub fn is_empty(storage: Rc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(storage.content.read_at(p) == storage.capacity)
})
}
#[inline]
pub fn is_full(storage: Rc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(storage.content.read_at(p) == 0)
})
}
#[inline]
pub fn content(storage: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(storage.content.read_at(p))
})
}
#[inline]
pub fn content_stats(storage: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(storage.content_stats.read_at(p))
})
}
#[inline]
pub fn content_changed(&self) -> impl Observable<Message = isize> + Clone {
self.content_source.publish()
}
#[inline]
pub fn content_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.content_changed().map(move |_| { () })
}
#[inline]
pub fn use_count(storage: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(storage.use_count.read_at(p))
})
}
#[inline]
pub fn use_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.use_count_source.publish()
}
#[inline]
pub fn use_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.use_count_changed().map(move |_| { () })
}
#[inline]
pub fn used_content(storage: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(storage.used_content.read_at(p))
})
}
#[inline]
pub fn used_content_changed(&self) -> impl Observable<Message = isize> + Clone {
self.used_content_source.publish()
}
#[inline]
pub fn used_content_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.used_content_changed().map(move |_| { () })
}
#[inline]
pub fn util_count(storage: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(storage.util_count.read_at(p))
})
}
#[inline]
pub fn util_count_stats(storage: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(storage.util_count_stats.read_at(p))
})
}
#[inline]
pub fn util_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.util_count_source.publish()
}
#[inline]
pub fn util_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.util_count_changed().map(move |_| { () })
}
#[inline]
pub fn queue_count(storage: Rc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(storage.queue_count.read_at(p))
})
}
#[inline]
pub fn queue_count_stats(storage: Rc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(storage.queue_count_stats.read_at(p))
})
}
#[inline]
pub fn queue_count_changed(&self) -> impl Observable<Message = isize> + Clone {
self.queue_count_source.publish()
}
#[inline]
pub fn queue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.queue_count_changed().map(move |_| { () })
}
#[inline]
pub fn total_wait_time(storage: Rc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok(storage.total_wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time(storage: Rc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(storage.wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time_changed(&self) -> impl Observable<Message = SamplingStats<f64>> + Clone {
self.wait_time_source.publish()
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.wait_time_changed().map(move |_| { () })
}
#[inline]
pub fn average_holding_time(storage: Rc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
let s = storage.util_count_stats.read_at(p);
let n = storage.util_count.read_at(p);
let m = storage.used_content.read_at(p);
let t = p.time;
let s2 = s.add(t, n);
Result::Ok(s2.sum / (m as f64))
})
}
#[inline]
fn update_content(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.content.read_at(p);
let a2 = a + delta;
let stats = self.content_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.content.write_at(a2, p);
self.content_stats.write_at(stats2, p);
self.content_source.trigger_at(&a2, p)
}
#[inline]
fn update_use_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.use_count.read_at(p);
let a2 = a + delta;
self.use_count.write_at(a2, p);
self.use_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_used_content(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.used_content.read_at(p);
let a2 = a + delta;
self.used_content.write_at(a2, p);
self.used_content_source.trigger_at(&a2, p)
}
#[inline]
fn update_queue_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.queue_count.read_at(p);
let a2 = a + delta;
let stats = self.queue_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.queue_count.write_at(a2, p);
self.queue_count_stats.write_at(stats2, p);
self.queue_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_util_count(&self, delta: isize, p: &Point) -> simulation::Result<()> {
let a = self.util_count.read_at(p);
let a2 = a + delta;
let stats = self.util_count_stats.read_at(p);
let stats2 = stats.add(p.time, a2);
self.util_count.write_at(a2, p);
self.util_count_stats.write_at(stats2, p);
self.util_count_source.trigger_at(&a2, p)
}
#[inline]
fn update_wait_time(&self, delta: f64, p: &Point) -> simulation::Result<()> {
let a = self.total_wait_time.read_at(p);
let a2 = a + delta;
let stats = self.wait_time.read_at(p);
let stats2 = stats.add(delta);
self.total_wait_time.write_at(a2, p);
self.wait_time.write_at(stats2, p);
self.wait_time_source.trigger_at(&stats2, p)
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.content_changed_()
.merge(self.used_content_changed_())
.merge(self.util_count_changed_())
.merge(self.queue_count_changed_())
}
#[inline]
pub fn reset(storage: Rc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let t = p.time;
let content = storage.content.read_at(p);
let used_content = storage.capacity - content;
let util_count = storage.util_count.read_at(p);
let queue_count = storage.queue_count.read_at(p);
storage.content_stats.write_at(TimingStats::from_sample(t, content), p);
storage.use_count.write_at(0, p);
storage.used_content.write_at(used_content, p);
storage.util_count_stats.write_at(TimingStats::from_sample(t, util_count), p);
storage.queue_count_stats.write_at(TimingStats::from_sample(t, queue_count), p);
storage.total_wait_time.write_at(0.0, p);
storage.wait_time.write_at(SamplingStats::empty(), p);
storage.content_source.trigger_at(&content, p)?;
storage.use_count_source.trigger_at(&0, p)?;
storage.used_content_source.trigger_at(&used_content, p)?;
storage.util_count_source.trigger_at(&util_count, p)?;
storage.queue_count_source.trigger_at(&queue_count, p)?;
storage.wait_time_source.trigger_at(&SamplingStats::empty(), p)
})
}
}
#[derive(Clone)]
pub struct NewStorage<T> {
capacity: isize,
_phantom: PhantomData<T>
}
impl<T> Event for NewStorage<T> {
type Item = Storage<T>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let t = p.time;
let capacity = self.capacity;
Result::Ok(Storage {
capacity: capacity,
content: RefComp::new(capacity),
content_stats: RefComp::new(TimingStats::from_sample(t, capacity)),
content_source: ObservableSource::new(),
use_count: RefComp::new(0),
use_count_source: ObservableSource::new(),
used_content: RefComp::new(0),
used_content_source: ObservableSource::new(),
util_count: RefComp::new(0),
util_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
util_count_source: ObservableSource::new(),
queue_count: RefComp::new(0),
queue_count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
queue_count_source: ObservableSource::new(),
total_wait_time: RefComp::new(0.0),
wait_time: RefComp::new(SamplingStats::empty()),
wait_time_source: ObservableSource::new(),
delay_chain: FCFSStorage::new()
})
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Enter<T> {
storage: Rc<Storage<T>>,
transact: Rc<Transact<T>>,
decrement: isize
}
impl<T> Process for Enter<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let cont = ProcessBoxCont::new(cont);
self.call_process_boxed(cont, pid, p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Enter { storage, transact, decrement } = self;
let t = p.time;
let f = storage.delay_chain.is_empty(p);
if f {
enter_free_storage_boxed(storage, transact, decrement, cont, pid, p)
} else {
let comp = enter_storage(storage.clone(), transact.clone(), decrement);
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let priority = Transact::priority_at(&transact, p);
let item = StorageDelayedItem {
transact: transact,
time: t,
decrement: decrement,
cont: cont
};
storage.delay_chain.push_with_priority(priority, item, p);
storage.update_queue_count(1, p)
}
}
}
#[inline]
pub fn enter_storage<T>(storage: Rc<Storage<T>>, transact: Rc<Transact<T>>, decrement: isize) -> Enter<T> {
Enter { storage: storage, transact: transact, decrement: decrement }
}
fn enter_free_storage_boxed<T>(storage: Rc<Storage<T>>, transact: Rc<Transact<T>>, decrement: isize,
cont: ProcessBoxCont<()>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where T: 'static
{
let t = p.time;
let a = storage.content.read_at(p);
if a < decrement {
let comp = enter_storage(storage.clone(), transact.clone(), decrement);
let cont = FrozenProcess::with_reentering(cont, pid, (), comp, p)?;
let priority = Transact::priority_at(&transact, p);
let item = StorageDelayedItem {
transact: transact,
time: t,
decrement: decrement,
cont: cont
};
storage.delay_chain.push_with_priority(priority, item, p);
storage.update_queue_count(1, p)
} else {
storage.update_wait_time(0.0, p)?;
storage.update_content(- decrement, p)?;
storage.update_use_count(1, p)?;
storage.update_used_content(decrement, p)?;
storage.update_util_count(decrement, p)?;
resume_process_boxed(cont, pid, (), p)
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Leave<T> {
storage: Rc<Storage<T>>,
increment: isize
}
impl<T> Process for Leave<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
#[inline]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Leave { storage, increment } = self;
leave_storage_within_event(storage, increment).call_event(p)?;
resume_process(cont, pid, (), p)
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Leave { storage, increment } = self;
leave_storage_within_event(storage, increment).call_event(p)?;
resume_process_boxed(cont, pid, (), p)
}
}
#[inline]
pub fn leave_storage<T>(storage: Rc<Storage<T>>, increment: isize) -> Leave<T> {
Leave { storage: storage, increment: increment }
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct LeaveWithinEvent<T> {
storage: Rc<Storage<T>>,
increment: isize
}
impl<T> Event for LeaveWithinEvent<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let LeaveWithinEvent { storage, increment } = self;
let t = p.time;
storage.update_util_count(- increment, p)?;
storage.update_content(increment, p)?;
enqueue_event(t, {
cons_event(move |p| {
try_enter_storage(storage, p)
}).into_boxed()
}).call_event(p)
}
}
#[inline]
pub fn leave_storage_within_event<T>(storage: Rc<Storage<T>>, increment: isize) -> LeaveWithinEvent<T> {
LeaveWithinEvent { storage: storage, increment: increment }
}
#[inline]
fn try_enter_storage<T>(storage: Rc<Storage<T>>, p: &Point) -> simulation::Result<()> {
let a = storage.content.read_at(p);
if a > 0 {
let_enter_storage(storage, p)
} else {
Result::Ok(())
}
}
#[inline]
fn let_enter_storage<T>(storage: Rc<Storage<T>>, p: &Point) -> simulation::Result<()> {
let t = p.time;
let a = storage.content.read_at(p);
if a > storage.capacity {
let msg = String::from("The storage content cannot exceed the limited capacity");
let err = Error::retry(msg);
Result::Err(err)
} else {
match storage.delay_chain.remove_by(move |i| { i.decrement <= a }, p) {
None => Result::Ok(()),
Some(StorageDelayedItem { transact: transact0, time: t0, decrement: decrement0, cont: cont0 }) => {
storage.update_queue_count(-1, p)?;
match cont0.unfreeze(p)? {
None => let_enter_storage(storage, p),
Some(cont) => {
let pid = transact0.require_process_id(p)?;
storage.update_content(- decrement0, p)?;
storage.update_wait_time(t - t0, p)?;
storage.update_util_count(decrement0, p)?;
storage.update_use_count(1, p)?;
storage.update_used_content(decrement0, p)?;
enqueue_event(t, {
cons_event(move |p| {
reenter_process(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
}
}
}
}