use std::marker::PhantomData;
use crate::simulation;
use crate::simulation::error::*;
use crate::simulation::Point;
use crate::simulation::ref_comp::RefComp;
use crate::simulation::observable::*;
use crate::simulation::observable::source::*;
use crate::simulation::simulation::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::strategy::*;
use crate::simulation::resource::*;
use dvcompute_utils::simulation::stats::*;
use dvcompute_utils::grc::Grc;
pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
pub struct Queue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy,
SO: QueueStrategy + 'static,
T: 'static
{
max_count: isize,
enqueue_resource: Grc<Resource<SI>>,
queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
dequeue_resource: Grc<Resource<SO>>,
count: RefComp<isize>,
count_stats: RefComp<TimingStats<isize>>,
enqueue_count: RefComp<isize>,
enqueue_lost_count: RefComp<isize>,
enqueue_store_count: RefComp<isize>,
dequeue_count: RefComp<isize>,
dequeue_extract_count: RefComp<isize>,
wait_time: RefComp<SamplingStats<f64>>,
total_wait_time: RefComp<SamplingStats<f64>>,
enqueue_wait_time: RefComp<SamplingStats<f64>>,
dequeue_wait_time: RefComp<SamplingStats<f64>>,
enqueue_initiated_source: ObservableSource<T>,
enqueue_lost_source: ObservableSource<T>,
enqueue_stored_source: ObservableSource<T>,
dequeue_requested_source: ObservableSource<()>,
dequeue_extracted_source: ObservableSource<T>
}
#[derive(Clone)]
struct QueueItem<T> {
value: T,
input_time: f64,
storing_time: f64
}
#[inline]
pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
enqueue_strategy: FCFSStrategy::Instance,
storing_strategy: FCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
max_count: max_count,
_phantom: PhantomData
}
}
#[inline]
pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
enqueue_strategy: FCFSStrategy::Instance,
storing_strategy: LCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
max_count: max_count,
_phantom: PhantomData
}
}
impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy + 'static,
SO: QueueStrategy + 'static,
T: Clone + 'static
{
#[inline]
pub fn new(enqueue_strategy: SI,
storing_strategy: SM,
dequeue_strategy: SO,
max_count: isize) -> NewQueue<SI, SM, SO, T>
{
NewQueue {
enqueue_strategy: enqueue_strategy,
storing_strategy: storing_strategy,
dequeue_strategy: dequeue_strategy,
max_count: max_count,
_phantom: PhantomData
}
}
#[inline]
pub fn max_count(&self) -> isize {
self.max_count
}
#[inline]
pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p) == 0)
})
}
#[inline]
pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
queue.is_empty_changed_()
.mapc(move |()| {
Queue::is_empty(queue.clone())
})
}
#[inline]
pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
}
#[inline]
pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p) == queue.max_count)
})
}
#[inline]
pub fn is_full_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
queue.is_full_changed_()
.mapc(move |()| {
Queue::is_full(queue.clone())
})
}
#[inline]
pub fn is_full_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
}
#[inline]
pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p))
})
}
#[inline]
pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(queue.count_stats.read_at(p))
})
}
#[inline]
pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.count_changed_()
.mapc(move |()| {
Queue::count(queue.clone())
})
}
#[inline]
pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
.merge(self.dequeue_extracted().map(|_| {}))
}
#[inline]
pub fn enqueue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.enqueue_count.read_at(p))
})
}
#[inline]
pub fn enqueue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.enqueue_count_changed_()
.mapc(move |()| {
Queue::enqueue_count(queue.clone())
})
}
#[inline]
pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_initiated().map(|_| {})
}
#[inline]
pub fn enqueue_lost_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.enqueue_lost_count.read_at(p))
})
}
#[inline]
pub fn enqueue_lost_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.enqueue_lost_count_changed_()
.mapc(move |()| {
Queue::enqueue_lost_count(queue.clone())
})
}
#[inline]
pub fn enqueue_lost_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_lost().map(|_| {})
}
#[inline]
pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.enqueue_store_count.read_at(p))
})
}
#[inline]
pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.enqueue_store_count_changed_()
.mapc(move |()| {
Queue::enqueue_store_count(queue.clone())
})
}
#[inline]
pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
}
#[inline]
pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_count.read_at(p))
})
}
#[inline]
pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.dequeue_count_changed_()
.mapc(move |()| {
Queue::dequeue_count(queue.clone())
})
}
#[inline]
pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_requested()
}
#[inline]
pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_extract_count.read_at(p))
})
}
#[inline]
pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.dequeue_extract_count_changed_()
.mapc(move |()| {
Queue::dequeue_extract_count(queue.clone())
})
}
#[inline]
pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.count.read_at(p);
let y = queue.max_count;
(x as f64) / (y as f64)
})
})
}
#[inline]
pub fn load_factor_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
queue.load_factor_changed_()
.mapc(move |()| {
Queue::load_factor(queue.clone())
})
}
#[inline]
pub fn load_factor_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
}
#[inline]
pub fn enqueue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.enqueue_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.enqueue_store_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.dequeue_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.dequeue_extract_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.wait_time_changed_()
.mapc(move |()| {
Queue::wait_time(queue.clone())
})
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn total_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.total_wait_time.read_at(p))
})
}
#[inline]
pub fn total_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.total_wait_time_changed_()
.mapc(move |()| {
Queue::total_wait_time(queue.clone())
})
}
#[inline]
pub fn total_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn enqueue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.enqueue_wait_time.read_at(p))
})
}
#[inline]
pub fn enqueue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.enqueue_wait_time_changed_()
.mapc(move |()| {
Queue::enqueue_wait_time(queue.clone())
})
}
#[inline]
pub fn enqueue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
}
#[inline]
pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_wait_time.read_at(p))
})
}
#[inline]
pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.dequeue_wait_time_changed_()
.mapc(move |()| {
Queue::dequeue_wait_time(queue.clone())
})
}
#[inline]
pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.count_stats.read_at(p);
let y = queue.wait_time.read_at(p);
x.mean() / y.mean
})
})
}
#[inline]
pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
queue.rate_changed_()
.mapc(move |()| {
Queue::rate(queue.clone())
})
}
#[inline]
pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
.merge(self.dequeue_extracted().map(|_| {}))
}
pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
cons_event({
let queue = queue.clone();
move |p| {
queue.dequeue_request(p)
}
})
.into_process()
.and_then(move |t| {
request_resource(queue.dequeue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(t, p)
})
.into_process()
})
})
}
pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
where SO::Priority: Clone
{
cons_event({
let queue = queue.clone();
move |p| {
queue.dequeue_request(p)
}
})
.into_process()
.and_then(move |t| {
request_resource_with_priority(queue.dequeue_resource.clone(), po)
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(t, p)
})
.into_process()
})
})
}
pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let t = queue.dequeue_request(p)?;
let x = queue.dequeue_extract(t, p)?;
Result::Ok(Some(x))
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|x| { x.is_some() })
}
pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|_| ())
}
pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static
{
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
match queue.queue_store.remove_boxed_by(pred, p) {
None => {
release_resource_within_event(queue.dequeue_resource.clone())
.call_event(p)?;
Result::Ok(None)
},
Some(i) => {
let t = queue.dequeue_request(p)?;
let x = queue.dequeue_post_extract(t, i, p)?;
Result::Ok(Some(x))
}
}
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
where F: Fn(&T) -> bool + 'static
{
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.exists_boxed(pred, p))
})
}
pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static,
T: Clone
{
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
})
}
pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
cons_event(move |p| {
loop {
let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
match x {
None => return Result::Ok(()),
Some(_) => {}
}
}
})
}
pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> {
cons_event({
let queue = queue.clone();
move |p| {
queue.enqueue_initiate(item, p)
}
})
.into_process()
.and_then(move |i| {
request_resource(queue.enqueue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store(i, p)
})
.into_process()
})
})
}
pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()>
where SI::Priority: Clone
{
cons_event({
let queue = queue.clone();
move |p| {
queue.enqueue_initiate(item, p)
}
})
.into_process()
.and_then(move |i| {
request_resource_with_priority(queue.enqueue_resource.clone(), pi)
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store(i, p)
})
.into_process()
})
})
}
pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()>
where SM::Priority: Clone
{
cons_event({
let queue = queue.clone();
move |p| {
queue.enqueue_initiate(item, p)
}
})
.into_process()
.and_then(move |i| {
request_resource(queue.enqueue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store_with_priority(pm, i, p)
})
.into_process()
})
})
}
pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()>
where SI::Priority: Clone,
SM::Priority: Clone
{
cons_event({
let queue = queue.clone();
move |p| {
queue.enqueue_initiate(item, p)
}
})
.into_process()
.and_then(move |i| {
request_resource_with_priority(queue.enqueue_resource.clone(), pi)
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store_with_priority(pm, i, p)
})
.into_process()
})
})
}
pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
let i = queue.enqueue_initiate(item, p)?;
queue.enqueue_store(i, p)?;
Result::Ok(true)
} else {
Result::Ok(false)
}
})
}
pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
let i = queue.enqueue_initiate(item, p)?;
queue.enqueue_store_with_priority(pm, i, p)?;
Result::Ok(true)
} else {
Result::Ok(false)
}
})
}
pub fn enqueue_or_lose(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
let i = queue.enqueue_initiate(item, p)?;
queue.enqueue_store(i, p)?;
Result::Ok(true)
} else {
queue.enqueue_deny(item, p)?;
Result::Ok(false)
}
})
}
pub fn enqueue_with_storing_priority_or_lose(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
let i = queue.enqueue_initiate(item, p)?;
queue.enqueue_store_with_priority(pm, i, p)?;
Result::Ok(true)
} else {
queue.enqueue_deny(item, p)?;
Result::Ok(false)
}
})
}
pub fn enqueue_or_lose_(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
Queue::enqueue_or_lose(queue, item)
.map(|_| {})
}
pub fn enqueue_with_storing_priority_or_lose_(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()> {
Queue::enqueue_with_storing_priority_or_lose(queue, pm, item)
.map(|_| {})
}
#[inline]
pub fn enqueue_initiated(&self) -> impl Observable<Message = T> + Clone {
self.enqueue_initiated_source.publish()
}
#[inline]
pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
self.enqueue_stored_source.publish()
}
#[inline]
pub fn enqueue_lost(&self) -> impl Observable<Message = T> + Clone {
self.enqueue_lost_source.publish()
}
#[inline]
pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_requested_source.publish()
}
#[inline]
pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
self.dequeue_extracted_source.publish()
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_initiated().map(|_| {})
.merge(self.enqueue_stored().map(|_| {}))
.merge(self.enqueue_lost().map(|_| {}))
.merge(self.dequeue_requested())
.merge(self.dequeue_extracted().map(|_| {}))
}
fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
let c = self.dequeue_count.read_at(p);
let c2 = c + 1;
self.dequeue_count.write_at(c2, p);
self.dequeue_requested_source.trigger_at(&(), p)?;
Result::Ok(p.time)
}
fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
let i = self.queue_store.pop(p).unwrap();
self.dequeue_post_extract(t_r, i, p)
}
fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
let t = p.time;
let c = self.count.read_at(p);
let c2 = c - 1;
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
let ec = self.dequeue_extract_count.read_at(p);
let ec2 = ec + 1;
self.count.write_at(c2, p);
self.count_stats.write_at(stats2, p);
self.dequeue_extract_count.write_at(ec2, p);
self.dequeue_stat(t_r, &i, p);
release_resource_within_event(self.enqueue_resource.clone())
.call_event(p)?;
self.dequeue_extracted_source
.trigger_at(&i.value, p)?;
Result::Ok(i.value)
}
fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
let t0 = i.input_time;
let t1 = i.storing_time;
let t = p.time;
let stats = self.dequeue_wait_time.read_at(p);
let stats2 = stats.add(t - t_r);
self.dequeue_wait_time.write_at(stats2, p);
let stats = self.total_wait_time.read_at(p);
let stats2 = stats.add(t - t0);
self.total_wait_time.write_at(stats2, p);
let stats = self.wait_time.read_at(p);
let stats2 = stats.add(t - t1);
self.wait_time.write_at(stats2, p);
}
fn enqueue_initiate(&self, item: T, p: &Point) -> simulation::Result<QueueItem<T>> {
let t = p.time;
let c = self.enqueue_count.read_at(p);
self.enqueue_count.write_at(c + 1, p);
self.enqueue_initiated_source
.trigger_at(&item, p)?;
Result::Ok(QueueItem {
value: item,
input_time: t,
storing_time: t
})
}
fn enqueue_store(&self, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
let t = p.time;
let i2 = QueueItem {
value: item.value,
input_time: item.input_time,
storing_time: t
};
self.queue_store.push(i2.clone(), p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
self.count_stats.write_at(stats2, p);
let sc = self.enqueue_store_count.read_at(p);
let sc2 = sc + 1;
self.enqueue_store_count.write_at(sc2, p);
self.enqueue_stat(&i2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)?;
self.enqueue_stored_source
.trigger_at(&i2.value, p)
}
fn enqueue_store_with_priority(&self, pm: SM::Priority, item: QueueItem<T>, p: &Point) -> simulation::Result<()> {
let t = p.time;
let i2 = QueueItem {
value: item.value,
input_time: item.input_time,
storing_time: t
};
self.queue_store.push_with_priority(pm, i2.clone(), p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
self.count_stats.write_at(stats2, p);
let sc = self.enqueue_store_count.read_at(p);
let sc2 = sc + 1;
self.enqueue_store_count.write_at(sc2, p);
self.enqueue_stat(&i2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)?;
self.enqueue_stored_source
.trigger_at(&i2.value, p)
}
fn enqueue_deny(&self, item: T, p: &Point) -> simulation::Result<()> {
let c = self.enqueue_lost_count.read_at(p);
let c2 = c + 1;
self.enqueue_lost_count.write_at(c2, p);
self.enqueue_lost_source
.trigger_at(&item, p)
}
fn enqueue_stat(&self, i: &QueueItem<T>, p: &Point) {
let t0 = i.input_time;
let t1 = i.storing_time;
let stats = self.enqueue_wait_time.read_at(p);
let stats2 = stats.add(t1 - t0);
self.enqueue_wait_time.write_at(stats2, p);
}
pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let t = p.time;
let count = queue.count.read_at(p);
queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
queue.enqueue_count.write_at(0, p);
queue.enqueue_lost_count.write_at(0, p);
queue.enqueue_store_count.write_at(0, p);
queue.dequeue_count.write_at(0, p);
queue.dequeue_extract_count.write_at(0, p);
queue.wait_time.write_at(SamplingStats::empty(), p);
queue.total_wait_time.write_at(SamplingStats::empty(), p);
queue.enqueue_wait_time.write_at(SamplingStats::empty(), p);
queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
Result::Ok(())
})
}
pub fn wait_while_full(queue: Grc<Self>) -> impl Process<Item = ()> {
Queue::is_full(queue.clone())
.into_process()
.and_then(move |x| {
if x {
process_await(queue.dequeue_extracted())
.and_then(move |_| {
Queue::wait_while_full(queue)
})
.into_boxed()
} else {
return_process(())
.into_boxed()
}
})
}
}
#[derive(Clone)]
pub struct NewQueue<SI, SM, SO, T> {
enqueue_strategy: SI,
storing_strategy: SM,
dequeue_strategy: SO,
max_count: isize,
_phantom: PhantomData<T>
}
impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy,
SO: QueueStrategy + 'static,
T: 'static
{
type Item = Queue<SI, SM, SO, T>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
if max_count < 0 {
let msg = String::from("The queue capacity cannot be actually negative");
let err = Error::retry(msg);
Result::Err(err)
} else {
let t = p.time;
let enqueue_resource = {
Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
.call_simulation(p.run)?
};
let queue_store = storing_strategy.new_storage();
let dequeue_resource = {
Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
.call_simulation(p.run)?
};
Result::Ok(Queue {
max_count: max_count,
enqueue_resource: Grc::new(enqueue_resource),
queue_store: queue_store,
dequeue_resource: Grc::new(dequeue_resource),
count: RefComp::new(0),
count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
enqueue_count: RefComp::new(0),
enqueue_lost_count: RefComp::new(0),
enqueue_store_count: RefComp::new(0),
dequeue_count: RefComp::new(0),
dequeue_extract_count: RefComp::new(0),
wait_time: RefComp::new(SamplingStats::empty()),
total_wait_time: RefComp::new(SamplingStats::empty()),
enqueue_wait_time: RefComp::new(SamplingStats::empty()),
dequeue_wait_time: RefComp::new(SamplingStats::empty()),
enqueue_initiated_source: ObservableSource::new(),
enqueue_lost_source: ObservableSource::new(),
enqueue_stored_source: ObservableSource::new(),
dequeue_requested_source: ObservableSource::new(),
dequeue_extracted_source: ObservableSource::new()
})
}
}
}