use std::marker::PhantomData;
use crate::simulation;
use crate::simulation::Point;
use crate::simulation::ref_comp::RefComp;
use crate::simulation::observable::*;
use crate::simulation::observable::source::*;
use crate::simulation::simulation::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::strategy::*;
use crate::simulation::resource::*;
use dvcompute_utils::simulation::stats::*;
use dvcompute_utils::grc::Grc;
pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, T>;
pub type LCFSQueue<T> = Queue<LCFSStrategy, FCFSStrategy, T>;
pub struct Queue<SM, SO, T>
where SM: QueueStrategy,
SO: QueueStrategy + 'static,
T: 'static
{
queue_store: QueueStorageBox<QueueItem<T>, SM::Priority>,
dequeue_resource: Grc<Resource<SO>>,
count: RefComp<isize>,
count_stats: RefComp<TimingStats<isize>>,
enqueue_store_count: RefComp<isize>,
dequeue_count: RefComp<isize>,
dequeue_extract_count: RefComp<isize>,
wait_time: RefComp<SamplingStats<f64>>,
dequeue_wait_time: RefComp<SamplingStats<f64>>,
enqueue_stored_source: ObservableSource<T>,
dequeue_requested_source: ObservableSource<()>,
dequeue_extracted_source: ObservableSource<T>
}
#[derive(Clone)]
struct QueueItem<T> {
value: T,
storing_time: f64
}
#[inline]
pub fn new_fcfs_queue<T>() -> NewQueue<FCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
storing_strategy: FCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
_phantom: PhantomData
}
}
#[inline]
pub fn new_lcfs_queue<T>() -> NewQueue<LCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
storing_strategy: LCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
_phantom: PhantomData
}
}
impl<SM, SO, T> Queue<SM, SO, T>
where SM: QueueStrategy + 'static,
SO: QueueStrategy + 'static,
T: Clone + 'static
{
#[inline]
pub fn new(storing_strategy: SM, dequeue_strategy: SO) -> NewQueue<SM, SO, T> {
NewQueue {
storing_strategy: storing_strategy,
dequeue_strategy: dequeue_strategy,
_phantom: PhantomData
}
}
#[inline]
pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p) == 0)
})
}
#[inline]
pub fn is_empty_changed(queue: Grc<Self>) -> impl Observable<Message = bool> + Clone {
queue.is_empty_changed_()
.mapc(move |()| {
Queue::is_empty(queue.clone())
})
}
#[inline]
pub fn is_empty_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.count_changed_()
}
#[inline]
pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p))
})
}
#[inline]
pub fn count_stats(queue: Grc<Self>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
Result::Ok(queue.count_stats.read_at(p))
})
}
#[inline]
pub fn count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.count_changed_()
.mapc(move |()| {
Queue::count(queue.clone())
})
}
#[inline]
pub fn count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
.merge(self.dequeue_extracted().map(|_| {}))
}
#[inline]
pub fn enqueue_store_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.enqueue_store_count.read_at(p))
})
}
#[inline]
pub fn enqueue_store_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.enqueue_store_count_changed_()
.mapc(move |()| {
Queue::enqueue_store_count(queue.clone())
})
}
#[inline]
pub fn enqueue_store_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
}
#[inline]
pub fn dequeue_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_count.read_at(p))
})
}
#[inline]
pub fn dequeue_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.dequeue_count_changed_()
.mapc(move |()| {
Queue::dequeue_count(queue.clone())
})
}
#[inline]
pub fn dequeue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_requested()
}
#[inline]
pub fn dequeue_extract_count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_extract_count.read_at(p))
})
}
#[inline]
pub fn dequeue_extract_count_changed(queue: Grc<Self>) -> impl Observable<Message = isize> + Clone {
queue.dequeue_extract_count_changed_()
.mapc(move |()| {
Queue::dequeue_extract_count(queue.clone())
})
}
#[inline]
pub fn dequeue_extract_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn store_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.enqueue_store_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn dequeue_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.dequeue_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn dequeue_extract_rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.dequeue_extract_count.read_at(p);
let t0 = p.run.specs.start_time;
let t = p.time;
(x as f64) / (t - t0)
})
})
}
#[inline]
pub fn wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.wait_time.read_at(p))
})
}
#[inline]
pub fn wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.wait_time_changed_()
.mapc(move |()| {
Queue::wait_time(queue.clone())
})
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn dequeue_wait_time(queue: Grc<Self>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
Result::Ok(queue.dequeue_wait_time.read_at(p))
})
}
#[inline]
pub fn dequeue_wait_time_changed(queue: Grc<Self>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
queue.dequeue_wait_time_changed_()
.mapc(move |()| {
Queue::dequeue_wait_time(queue.clone())
})
}
#[inline]
pub fn dequeue_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_extracted().map(|_| {})
}
#[inline]
pub fn rate(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.count_stats.read_at(p);
let y = queue.wait_time.read_at(p);
x.mean() / y.mean
})
})
}
#[inline]
pub fn rate_changed(queue: Grc<Self>) -> impl Observable<Message = f64> + Clone {
queue.rate_changed_()
.mapc(move |()| {
Queue::rate(queue.clone())
})
}
#[inline]
pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
.merge(self.dequeue_extracted().map(|_| {}))
}
pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
cons_event({
let queue = queue.clone();
move |p| {
queue.dequeue_request(p)
}
})
.into_process()
.and_then(move |t| {
request_resource(queue.dequeue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(t, p)
})
.into_process()
})
})
}
pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
where SO::Priority: Clone
{
cons_event({
let queue = queue.clone();
move |p| {
queue.dequeue_request(p)
}
})
.into_process()
.and_then(move |t| {
request_resource_with_priority(queue.dequeue_resource.clone(), po)
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(t, p)
})
.into_process()
})
})
}
pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let t = queue.dequeue_request(p)?;
let x = queue.dequeue_extract(t, p)?;
Result::Ok(Some(x))
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|x| { x.is_some() })
}
pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|_| ())
}
pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static
{
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
match queue.queue_store.remove_boxed_by(pred, p) {
None => {
release_resource_within_event(queue.dequeue_resource.clone())
.call_event(p)?;
Result::Ok(None)
},
Some(i) => {
let t = queue.dequeue_request(p)?;
let x = queue.dequeue_post_extract(t, i, p)?;
Result::Ok(Some(x))
}
}
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
where F: Fn(&T) -> bool + 'static
{
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.exists_boxed(pred, p))
})
}
pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static,
T: Clone
{
cons_event(move |p| {
let pred = move |x: &QueueItem<T>| { pred(&x.value) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.value.clone() }))
})
}
pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
cons_event(move |p| {
loop {
let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
match x {
None => return Result::Ok(()),
Some(_) => {}
}
}
})
}
#[inline]
pub fn enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = ()> {
cons_event(move |p| {
queue.enqueue_store(item, p)
})
}
#[inline]
pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = ()>
where SM::Priority: Clone
{
cons_event(move |p| {
queue.enqueue_store_with_priority(pm, item, p)
})
}
#[inline]
pub fn enqueue_stored(&self) -> impl Observable<Message = T> + Clone {
self.enqueue_stored_source.publish()
}
#[inline]
pub fn dequeue_requested(&self) -> impl Observable<Message = ()> + Clone {
self.dequeue_requested_source.publish()
}
#[inline]
pub fn dequeue_extracted(&self) -> impl Observable<Message = T> + Clone {
self.dequeue_extracted_source.publish()
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueue_stored().map(|_| {})
.merge(self.dequeue_requested())
.merge(self.dequeue_extracted().map(|_| {}))
}
fn dequeue_request(&self, p: &Point) -> simulation::Result<f64> {
let c = self.dequeue_count.read_at(p);
let c2 = c + 1;
self.dequeue_count.write_at(c2, p);
self.dequeue_requested_source.trigger_at(&(), p)?;
Result::Ok(p.time)
}
fn dequeue_extract(&self, t_r: f64, p: &Point) -> simulation::Result<T> {
let i = self.queue_store.pop(p).unwrap();
self.dequeue_post_extract(t_r, i, p)
}
fn dequeue_post_extract(&self, t_r: f64, i: QueueItem<T>, p: &Point) -> simulation::Result<T> {
let t = p.time;
let c = self.count.read_at(p);
let c2 = c - 1;
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
let ec = self.dequeue_extract_count.read_at(p);
let ec2 = ec + 1;
self.count.write_at(c2, p);
self.count_stats.write_at(stats2, p);
self.dequeue_extract_count.write_at(ec2, p);
self.dequeue_stat(t_r, &i, p);
self.dequeue_extracted_source
.trigger_at(&i.value, p)?;
Result::Ok(i.value)
}
fn dequeue_stat(&self, t_r: f64, i: &QueueItem<T>, p: &Point) {
let t1 = i.storing_time;
let t = p.time;
let stats = self.dequeue_wait_time.read_at(p);
let stats2 = stats.add(t - t_r);
self.dequeue_wait_time.write_at(stats2, p);
let stats = self.wait_time.read_at(p);
let stats2 = stats.add(t - t1);
self.wait_time.write_at(stats2, p);
}
fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
let t = p.time;
let i2 = QueueItem {
value: item,
storing_time: t
};
self.queue_store.push(i2.clone(), p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
self.count_stats.write_at(stats2, p);
let sc = self.enqueue_store_count.read_at(p);
let sc2 = sc + 1;
self.enqueue_store_count.write_at(sc2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)?;
self.enqueue_stored_source
.trigger_at(&i2.value, p)
}
fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
let t = p.time;
let i2 = QueueItem {
value: item,
storing_time: t
};
self.queue_store.push_with_priority(pm, i2.clone(), p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
let stats = self.count_stats.read_at(p);
let stats2 = stats.add(t, c2);
self.count_stats.write_at(stats2, p);
let sc = self.enqueue_store_count.read_at(p);
let sc2 = sc + 1;
self.enqueue_store_count.write_at(sc2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)?;
self.enqueue_stored_source
.trigger_at(&i2.value, p)
}
pub fn reset(queue: Grc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let t = p.time;
let count = queue.count.read_at(p);
queue.count_stats.write_at(TimingStats::from_sample(t, count), p);
queue.enqueue_store_count.write_at(0, p);
queue.dequeue_count.write_at(0, p);
queue.dequeue_extract_count.write_at(0, p);
queue.wait_time.write_at(SamplingStats::empty(), p);
queue.dequeue_wait_time.write_at(SamplingStats::empty(), p);
Result::Ok(())
})
}
}
#[derive(Clone)]
pub struct NewQueue<SM, SO, T> {
storing_strategy: SM,
dequeue_strategy: SO,
_phantom: PhantomData<T>
}
impl<SM, SO, T> Event for NewQueue<SM, SO, T>
where SM: QueueStrategy,
SO: QueueStrategy + 'static,
T: 'static
{
type Item = Queue<SM, SO, T>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let NewQueue { storing_strategy, dequeue_strategy, _phantom } = self;
let t = p.time;
let queue_store = storing_strategy.new_storage();
let dequeue_resource = {
Resource::<SO>::new_with_max_count(dequeue_strategy, 0, None)
.call_simulation(p.run)?
};
Result::Ok(Queue {
queue_store: queue_store,
dequeue_resource: Grc::new(dequeue_resource),
count: RefComp::new(0),
count_stats: RefComp::new(TimingStats::from_sample(t, 0)),
enqueue_store_count: RefComp::new(0),
dequeue_count: RefComp::new(0),
dequeue_extract_count: RefComp::new(0),
wait_time: RefComp::new(SamplingStats::empty()),
dequeue_wait_time: RefComp::new(SamplingStats::empty()),
enqueue_stored_source: ObservableSource::new(),
dequeue_requested_source: ObservableSource::new(),
dequeue_extracted_source: ObservableSource::new()
})
}
}