use std::marker::PhantomData;
use crate::simulation;
use crate::simulation::error::*;
use crate::simulation::Point;
use crate::simulation::ref_comp::RefComp;
use crate::simulation::simulation::*;
use crate::simulation::event::*;
use crate::simulation::process::*;
use crate::simulation::strategy::*;
use crate::simulation::resource::*;
use dvcompute_utils::grc::Grc;
pub mod stats;
pub mod unbounded;
pub type FCFSQueue<T> = Queue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>;
pub type LCFSQueue<T> = Queue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>;
pub struct Queue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy,
SO: QueueStrategy + 'static
{
max_count: isize,
enqueue_resource: Grc<Resource<SI>>,
queue_store: QueueStorageBox<T, SM::Priority>,
dequeue_resource: Grc<Resource<SO>>,
count: RefComp<isize>
}
#[inline]
pub fn new_fcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, FCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
enqueue_strategy: FCFSStrategy::Instance,
storing_strategy: FCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
max_count: max_count,
_phantom: PhantomData
}
}
#[inline]
pub fn new_lcfs_queue<T>(max_count: isize) -> NewQueue<FCFSStrategy, LCFSStrategy, FCFSStrategy, T>
where T: 'static
{
NewQueue {
enqueue_strategy: FCFSStrategy::Instance,
storing_strategy: LCFSStrategy::Instance,
dequeue_strategy: FCFSStrategy::Instance,
max_count: max_count,
_phantom: PhantomData
}
}
impl<SI, SM, SO, T> Queue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy + 'static,
SO: QueueStrategy + 'static,
T: Clone + 'static
{
#[inline]
pub fn new(enqueue_strategy: SI,
storing_strategy: SM,
dequeue_strategy: SO,
max_count: isize) -> NewQueue<SI, SM, SO, T>
{
NewQueue {
enqueue_strategy: enqueue_strategy,
storing_strategy: storing_strategy,
dequeue_strategy: dequeue_strategy,
max_count: max_count,
_phantom: PhantomData
}
}
#[inline]
pub fn max_count(&self) -> isize {
self.max_count
}
#[inline]
pub fn is_empty(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p) == 0)
})
}
#[inline]
pub fn is_full(queue: Grc<Self>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p) == queue.max_count)
})
}
#[inline]
pub fn count(queue: Grc<Self>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
Result::Ok(queue.count.read_at(p))
})
}
#[inline]
pub fn load_factor(queue: Grc<Self>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
Result::Ok({
let x = queue.count.read_at(p);
let y = queue.max_count;
(x as f64) / (y as f64)
})
})
}
pub fn dequeue(queue: Grc<Self>) -> impl Process<Item = T> {
request_resource(queue.dequeue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(p)
})
.into_process()
})
}
pub fn dequeue_with_output_priority(queue: Grc<Self>, po: SO::Priority) -> impl Process<Item = T>
where SO::Priority: Clone
{
request_resource_with_priority(queue.dequeue_resource.clone(), po)
.and_then(move |()| {
cons_event(move |p| {
queue.dequeue_extract(p)
})
.into_process()
})
}
pub fn try_dequeue(queue: Grc<Self>) -> impl Event<Item = Option<T>> {
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let x = queue.dequeue_extract(p)?;
Result::Ok(Some(x))
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn delete(queue: Grc<Self>, item: T) -> impl Event<Item = bool>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|x| { x.is_some() })
}
pub fn delete_(queue: Grc<Self>, item: T) -> impl Event<Item = ()>
where T: PartialEq
{
let pred = move |x: &T| { *x == item };
Queue::delete_by(queue, pred)
.map(|_| ())
}
pub fn delete_by<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static
{
try_request_resource_within_event(queue.dequeue_resource.clone())
.and_then(move |f| {
if f {
cons_event(move |p| {
let pred = move |x: &T| { pred(x) };
let pred = Box::new(pred);
match queue.queue_store.remove_boxed_by(pred, p) {
None => {
release_resource_within_event(queue.dequeue_resource.clone())
.call_event(p)?;
Result::Ok(None)
},
Some(i) => {
let x = queue.dequeue_post_extract(i, p)?;
Result::Ok(Some(x))
}
}
}).into_boxed()
} else {
return_event(None)
.into_boxed()
}
})
}
pub fn exists<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = bool>
where F: Fn(&T) -> bool + 'static
{
cons_event(move |p| {
let pred = move |x: &T| { pred(x) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.exists_boxed(pred, p))
})
}
pub fn find<F>(queue: Grc<Self>, pred: F) -> impl Event<Item = Option<T>>
where F: Fn(&T) -> bool + 'static,
T: Clone
{
cons_event(move |p| {
let pred = move |x: &T| { pred(x) };
let pred = Box::new(pred);
Result::Ok(queue.queue_store.find_boxed(pred, p).map(|x| { x.clone() }))
})
}
pub fn clear(queue: Grc<Self>) -> impl Event<Item = ()> {
cons_event(move |p| {
loop {
let x = Queue::try_dequeue(queue.clone()).call_event(p)?;
match x {
None => return Result::Ok(()),
Some(_) => {}
}
}
})
}
pub fn enqueue(queue: Grc<Self>, item: T) -> impl Process<Item = ()> {
request_resource(queue.enqueue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store(item, p)
})
.into_process()
})
}
pub fn enqueue_with_input_priority(queue: Grc<Self>, pi: SI::Priority, item: T) -> impl Process<Item = ()>
where SI::Priority: Clone
{
request_resource_with_priority(queue.enqueue_resource.clone(), pi)
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store(item, p)
})
.into_process()
})
}
pub fn enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Process<Item = ()>
where SM::Priority: Clone
{
request_resource(queue.enqueue_resource.clone())
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store_with_priority(pm, item, p)
})
.into_process()
})
}
pub fn enqueue_with_input_and_storing_priorities(queue: Grc<Self>, pi: SI::Priority, pm: SM::Priority, item: T) -> impl Process<Item = ()>
where SI::Priority: Clone,
SM::Priority: Clone
{
request_resource_with_priority(queue.enqueue_resource.clone(), pi)
.and_then(move |()| {
cons_event(move |p| {
queue.enqueue_store_with_priority(pm, item, p)
})
.into_process()
})
}
pub fn try_enqueue(queue: Grc<Self>, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
queue.enqueue_store(item, p)?;
Result::Ok(true)
} else {
Result::Ok(false)
}
})
}
pub fn try_enqueue_with_storing_priority(queue: Grc<Self>, pm: SM::Priority, item: T) -> impl Event<Item = bool> {
cons_event(move |p| {
let x = {
try_request_resource_within_event(queue.enqueue_resource.clone())
.call_event(p)
}?;
if x {
queue.enqueue_store_with_priority(pm, item, p)?;
Result::Ok(true)
} else {
Result::Ok(false)
}
})
}
fn dequeue_extract(&self, p: &Point) -> simulation::Result<T> {
let i = self.queue_store.pop(p).unwrap();
self.dequeue_post_extract(i, p)
}
fn dequeue_post_extract(&self, i: T, p: &Point) -> simulation::Result<T> {
let c = self.count.read_at(p);
let c2 = c - 1;
self.count.write_at(c2, p);
release_resource_within_event(self.enqueue_resource.clone())
.call_event(p)?;
Result::Ok(i)
}
fn enqueue_store(&self, item: T, p: &Point) -> simulation::Result<()> {
self.queue_store.push(item, p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)
}
fn enqueue_store_with_priority(&self, pm: SM::Priority, item: T, p: &Point) -> simulation::Result<()> {
self.queue_store.push_with_priority(pm, item, p);
let c = self.count.read_at(p);
let c2 = c + 1;
self.count.write_at(c2, p);
release_resource_within_event(self.dequeue_resource.clone())
.call_event(p)
}
}
#[derive(Clone)]
pub struct NewQueue<SI, SM, SO, T> {
enqueue_strategy: SI,
storing_strategy: SM,
dequeue_strategy: SO,
max_count: isize,
_phantom: PhantomData<T>
}
impl<SI, SM, SO, T> Event for NewQueue<SI, SM, SO, T>
where SI: QueueStrategy + 'static,
SM: QueueStrategy,
SO: QueueStrategy + 'static,
T: 'static
{
type Item = Queue<SI, SM, SO, T>;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let NewQueue { enqueue_strategy, storing_strategy, dequeue_strategy, max_count, _phantom } = self;
if max_count < 0 {
let msg = String::from("The queue capacity cannot be actually negative");
let err = Error::retry(msg);
Result::Err(err)
} else {
let enqueue_resource = {
Resource::<SI>::new_with_max_count(enqueue_strategy, max_count, Some(max_count))
.call_simulation(p.run)?
};
let queue_store = storing_strategy.new_storage();
let dequeue_resource = {
Resource::<SO>::new_with_max_count(dequeue_strategy, 0, Some(max_count))
.call_simulation(p.run)?
};
Result::Ok(Queue {
max_count: max_count,
enqueue_resource: Grc::new(enqueue_resource),
queue_store: queue_store,
dequeue_resource: Grc::new(dequeue_resource),
count: RefComp::new(0)
})
}
}
}