use std::hash::{Hash, Hasher};
use dvcompute::simulation;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute::simulation::Point;
use dvcompute::simulation::event::*;
use dvcompute::simulation::observable::*;
use dvcompute::simulation::observable::source::*;
use dvcompute_utils::simulation::stats::*;
use dvcompute_utils::grc::Grc;
use crate::simulation::transact::*;
pub struct Queue {
pub sequence_no: u64,
content: RefComp<isize>,
content_stats: RefComp<TimingStats<isize>>,
enqueue_count: RefComp<isize>,
enqueue_zero_entry_count: RefComp<isize>,
wait_time: RefComp<SamplingStats<f64>>,
non_zero_entry_wait_time: RefComp<SamplingStats<f64>>,
enqueued: ObservableSource<()>,
dequeued: ObservableSource<()>
}
#[derive(Clone)]
pub struct QueueEntry {
pub queue: Grc<Queue>,
pub enqueue_time: f64
}
impl PartialEq for Queue {
fn eq(&self, other: &Self) -> bool {
self.content == other.content
}
}
impl Eq for Queue {}
impl Hash for Queue {
fn hash<H: Hasher>(&self, state: &mut H) {
self.sequence_no.hash(state)
}
}
impl Queue {
#[inline]
pub fn new() -> NewQueue {
NewQueue {}
}
#[inline]
pub fn is_empty(queue: Grc<Queue>) -> impl Event<Item = bool> + Clone {
cons_event(move |p| {
let n = queue.content.read_at(p);
Result::Ok(n == 0)
})
}
#[inline]
pub fn content(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
let n = queue.content.read_at(p);
Result::Ok(n)
})
}
#[inline]
pub fn content_stats(queue: Grc<Queue>) -> impl Event<Item = TimingStats<isize>> + Clone {
cons_event(move |p| {
let stats = queue.content_stats.read_at(p);
Result::Ok(stats)
})
}
#[inline]
pub fn content_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
Queue::content_changed_(&queue)
.mapc(move |()| { Queue::content(queue.clone()) })
}
#[inline]
pub fn content_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueued().merge(self.dequeued())
}
#[inline]
pub fn enqueue_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
let n = queue.enqueue_count.read_at(p);
Result::Ok(n)
})
}
#[inline]
pub fn enqueue_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
Queue::enqueue_count_changed_(&queue)
.mapc(move |()| { Queue::enqueue_count(queue.clone()) })
}
#[inline]
pub fn enqueue_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueued()
}
#[inline]
pub fn enqueue_zero_entry_count(queue: Grc<Queue>) -> impl Event<Item = isize> + Clone {
cons_event(move |p| {
let n = queue.enqueue_zero_entry_count.read_at(p);
Result::Ok(n)
})
}
#[inline]
pub fn enqueue_zero_entry_count_changed(queue: Grc<Queue>) -> impl Observable<Message = isize> + Clone {
Queue::enqueue_zero_entry_count_changed_(&queue)
.mapc(move |()| { Queue::enqueue_zero_entry_count(queue.clone()) })
}
#[inline]
pub fn enqueue_zero_entry_count_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeued()
}
#[inline]
pub fn wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
let stats = queue.wait_time.read_at(p);
Result::Ok(stats)
})
}
#[inline]
pub fn wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
Queue::wait_time_changed_(&queue)
.mapc(move |()| { Queue::wait_time(queue.clone()) })
}
#[inline]
pub fn wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeued()
}
#[inline]
pub fn non_zero_entry_wait_time(queue: Grc<Queue>) -> impl Event<Item = SamplingStats<f64>> + Clone {
cons_event(move |p| {
let stats = queue.non_zero_entry_wait_time.read_at(p);
Result::Ok(stats)
})
}
#[inline]
pub fn non_zero_entry_wait_time_changed(queue: Grc<Queue>) -> impl Observable<Message = SamplingStats<f64>> + Clone {
Queue::non_zero_entry_wait_time_changed_(&queue)
.mapc(move |()| { Queue::non_zero_entry_wait_time(queue.clone()) })
}
#[inline]
pub fn non_zero_entry_wait_time_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.dequeued()
}
#[inline]
pub fn rate(queue: Grc<Queue>) -> impl Event<Item = f64> + Clone {
cons_event(move |p| {
let x = queue.content_stats.read_at(p);
let y = queue.wait_time.read_at(p);
Result::Ok(x.mean() / y.mean)
})
}
#[inline]
pub fn rate_changed(queue: Grc<Queue>) -> impl Observable<Message = f64> + Clone {
Queue::rate_changed_(&queue)
.mapc(move |()| { Queue::rate(queue.clone()) })
}
#[inline]
pub fn rate_changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueued().merge(self.dequeued())
}
#[inline]
pub fn enqueued(&self) -> impl Observable<Message = ()> + Clone {
self.enqueued.publish()
}
#[inline]
pub fn dequeued(&self) -> impl Observable<Message = ()> + Clone {
self.dequeued.publish()
}
#[inline]
pub fn enqueue(queue: Grc<Queue>, transact_id: Grc<TransactId>, increment: isize) -> Enqueue {
Enqueue { queue: queue, transact_id: transact_id, increment: increment }
}
#[inline]
pub fn dequeue(queue: Grc<Queue>, transact_id: Grc<TransactId>, decrement: isize) -> Dequeue {
Dequeue { queue: queue, transact_id: transact_id, decrement: decrement }
}
#[inline]
pub fn changed_(&self) -> impl Observable<Message = ()> + Clone {
self.enqueued().merge(self.dequeued())
}
pub fn reset(queue: Grc<Queue>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| {
let content = queue.content.read_at(p);
queue.content_stats.write_at(TimingStats::from_sample(p.time, content), p);
queue.enqueue_count.write_at(0, p);
queue.enqueue_zero_entry_count.write_at(0, p);
queue.wait_time.write_at(SamplingStats::empty(), p);
queue.non_zero_entry_wait_time.write_at(SamplingStats::empty(), p);
Result::Ok(())
})
}
}
#[derive(Clone)]
pub struct NewQueue {}
impl Event for NewQueue {
type Item = Queue;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let t = p.time;
let gen = &p.run.generator;
let sequence_no = gen.random_sequence_no();
Result::Ok(Queue {
sequence_no: sequence_no,
content: RefComp::new(0),
content_stats: RefComp::new(TimingStats::from_sample(t, 0)),
enqueue_count: RefComp::new(0),
enqueue_zero_entry_count: RefComp::new(0),
wait_time: RefComp::new(SamplingStats::empty()),
non_zero_entry_wait_time: RefComp::new(SamplingStats::empty()),
enqueued: ObservableSource::new(),
dequeued: ObservableSource::new()
})
}
}
#[derive(Clone)]
pub struct Enqueue {
queue: Grc<Queue>,
transact_id: Grc<TransactId>,
increment: isize
}
impl Event for Enqueue {
type Item = ();
#[doc(hidden)]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let Enqueue { queue, transact_id, increment } = self;
let t = p.time;
let e = QueueEntry { queue: queue.clone(), enqueue_time: t };
let n = queue.enqueue_count.read_at(p);
let c = queue.content.read_at(p);
let stats = queue.content_stats.read_at(p);
queue.enqueue_count.write_at(n + 1, p);
queue.content.write_at(c + increment, p);
queue.content_stats.write_at(stats.add(t, c + increment), p);
match transact_id.register_queue_entry(e, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => queue.enqueued.trigger_at(&(), p)
}
}
}
#[derive(Clone)]
pub struct Dequeue {
queue: Grc<Queue>,
transact_id: Grc<TransactId>,
decrement: isize
}
impl Event for Dequeue {
type Item = ();
#[doc(hidden)]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let Dequeue { queue, transact_id, decrement } = self;
match transact_id.unregister_queue_entry(&queue, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(e) => {
let t = p.time;
let t0 = e.enqueue_time;
let dt = t - t0;
let c = queue.content.read_at(p);
let stats = queue.content_stats.read_at(p);
let wait_time = queue.wait_time.read_at(p);
queue.content.write_at(c - decrement, p);
queue.content_stats.write_at(stats.add(t, c - decrement), p);
queue.wait_time.write_at(wait_time.add(dt), p);
if t == t0 {
let c2 = queue.enqueue_zero_entry_count.read_at(p);
queue.enqueue_zero_entry_count.write_at(c2 + 1, p);
} else {
let wait_time2 = queue.non_zero_entry_wait_time.read_at(p);
queue.non_zero_entry_wait_time.write_at(wait_time2.add(dt), p);
}
queue.dequeued.trigger_at(&(), p)
}
}
}
}