use std::cell::RefCell;
use std::collections::HashMap;
use dvcompute::simulation;
use dvcompute::simulation::Run;
use dvcompute::simulation::Point;
use dvcompute::simulation::error::*;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::process::*;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute_utils::simulation::arrival::*;
use dvcompute_utils::grc::Grc;
use crate::simulation::queue::*;
use crate::simulation::assembly::*;
#[derive(Clone)]
pub struct Transact<T> {
pub transact_id: Grc<TransactId>,
pub value: T
}
impl<T> Transact<T> {
#[inline]
pub fn new(arrival: Arrival<T>, priority: isize) -> NewTransact<T> {
NewTransact { arrival: arrival, priority: priority }
}
#[inline]
pub fn split(transact: Transact<T>) -> Split<T> {
Split { transact: transact }
}
}
pub struct TransactId {
pub sequence_no: u64,
pub arrival_delay: Option<f64>,
pub arrival_time: f64,
priority: RefComp<isize>,
assembly_set: Grc<RefComp<Option<Grc<AssemblySet>>>>,
preemption_count: RefComp<isize>,
process_id: RefComp<Option<Grc<ProcessId>>>,
process_cont: RefComp<Option<FrozenProcess<()>>>,
queue_entries: RefCell<HashMap<Grc<Queue>, QueueEntry>>
}
impl PartialEq for TransactId {
fn eq(&self, other: &Self) -> bool {
self.process_id == other.process_id
}
}
impl Eq for TransactId {}
impl TransactId {
#[inline]
pub fn priority_at(&self, p: &Point) -> isize {
self.priority.read_at(p)
}
#[inline]
pub fn priority(transact_id: Grc<TransactId>) -> impl Event<Item = isize> {
cons_event(move |p| {
Result::Ok(transact_id.priority_at(p))
})
}
#[inline]
pub fn assign_priority(transact_id: Grc<TransactId>, priority: isize) -> impl Process<Item = ()> {
cons_event(move |p| {
transact_id.priority.write_at(priority, p);
Result::Ok(())
})
.into_process()
.and_then(move |()| {
process_with_priority(priority)
})
}
pub fn register_queue_entry(&self, entry: QueueEntry, _p: &Point) -> simulation::Result<()> {
let mut entries = self.queue_entries.borrow_mut();
if entries.contains_key(&entry.queue) {
let msg = String::from("There is another entry for the specified queue");
Result::Err(Error::retry(msg))
} else {
let _ = entries.insert(entry.queue.clone(), entry);
Result::Ok(())
}
}
pub fn unregister_queue_entry(&self, queue: &Queue, _p: &Point) -> simulation::Result<QueueEntry> {
let mut entries = self.queue_entries.borrow_mut();
match entries.remove(queue) {
None => {
let msg = String::from("There must be entry for the specified queue");
Result::Err(Error::retry(msg))
},
Some(entry) => {
Result::Ok(entry)
}
}
}
pub fn assembly_set(&self, p: &Point) -> simulation::Result<Grc<AssemblySet>> {
match self.assembly_set.read_at(p) {
Some(a) => Result::Ok(a),
None => {
match AssemblySet::new().call_simulation(&p.run) {
Result::Err(e) => Result::Err(e),
Result::Ok(a) => {
let a = Grc::new(a);
self.assembly_set.write_at(Some(a.clone()), p);
Result::Ok(a)
}
}
}
}
}
pub fn take(transact_id: Grc<Self>) -> Take {
Take { transact_id: transact_id }
}
pub fn release(transact_id: Grc<Self>) -> Release {
Release { transact_id: transact_id }
}
#[doc(hidden)]
pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let n = self.preemption_count.read_at(p);
self.preemption_count.write_at(1 + n, p);
match self.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => pid.begin_preemption_at(p)
}
}
#[doc(hidden)]
pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let n = self.preemption_count.read_at(p);
if n <= 0 {
let msg = String::from("The transact preemption count cannot be negative");
let err = Error::retry(msg);
Result::Err(err)
} else {
self.preemption_count.write_at(n - 1, p);
match self.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => {
match pid.end_preemption_at(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
match self.process_cont.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
match cont.unfreeze(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(None) => Result::Ok(()),
Result::Ok(Some(cont)) => {
enqueue_event(p.time, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
}
}
}
}
}
}
}
}
#[inline]
pub fn begin_preemption(transact_id: Grc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| transact_id.begin_preemption_at(p))
}
#[inline]
pub fn end_preemption(transact_id: Grc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| transact_id.end_preemption_at(p))
}
pub fn require_process_id(&self, p: &Point) -> simulation::Result<Grc<ProcessId>> {
match self.process_id.read_at(p) {
Some(pid) => Result::Ok(pid),
None => {
let msg = String::from("The transact must be associated with some process");
let err = Error::retry(msg);
Result::Err(err)
}
}
}
pub fn transfer<M>(transact_id: Grc<Self>, comp: M, p: &Point) -> simulation::Result<()>
where M: Process<Item = ()> + 'static
{
let result = {
match transact_id.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => pid.initiate_cancel_at(p)
}
};
match result {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
transact_id.process_id.write_at(None, p);
transact_id.process_cont.write_at(None, p);
TransactId::take(transact_id)
.and_then(move |()| {
transfer_process(comp)
})
.run()
.call_event(p)
}
}
}
pub fn reactivate<I, M>(iter: I, p: &Point) -> simulation::Result<()>
where I: IntoIterator<Item = (Grc<Self>, Option<M>)>,
M: Process<Item = ()> + 'static
{
let mut iter = iter.into_iter();
while let Some((transact_id, comp)) = iter.next() {
match comp {
None => {
match transact_id.require_process_id(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(pid) => {
match ProcessId::reactivate(pid).call_event(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
}
},
Some(comp) => {
match TransactId::transfer(transact_id, comp, p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
}
}
Result::Ok(())
}
}
#[derive(Clone)]
pub struct NewTransact<T> {
arrival: Arrival<T>,
priority: isize
}
impl<T> Simulation for NewTransact<T> {
type Item = Transact<T>;
#[doc(hidden)]
#[inline]
fn call_simulation(self, r: &Run) -> simulation::Result<Self::Item> {
let NewTransact { arrival, priority } = self;
let gen = &r.generator;
let sequence_no = gen.random_sequence_no();
let transact_id = Grc::new(TransactId {
sequence_no: sequence_no,
arrival_delay: arrival.delay,
arrival_time: arrival.time,
priority: RefComp::new(priority),
assembly_set: Grc::new(RefComp::new(None)),
preemption_count: RefComp::new(0),
process_id: RefComp::new(None),
process_cont: RefComp::new(None),
queue_entries: RefCell::new(HashMap::new())
});
Result::Ok(Transact {
transact_id: transact_id,
value: arrival.value
})
}
}
#[derive(Clone)]
pub struct Split<T> {
transact: Transact<T>
}
impl<T> Simulation for Split<T> {
type Item = Transact<T>;
#[doc(hidden)]
#[inline]
fn call_simulation(self, r: &Run) -> simulation::Result<Self::Item> {
let Split { transact } = self;
let gen = &r.generator;
let sequence_no = gen.random_sequence_no();
let transact_id = Grc::new(TransactId {
sequence_no: sequence_no,
arrival_delay: transact.transact_id.arrival_delay,
arrival_time: transact.transact_id.arrival_time,
priority: transact.transact_id.priority.clone(),
assembly_set: transact.transact_id.assembly_set.clone(),
preemption_count: RefComp::new(0),
process_id: RefComp::new(None),
process_cont: RefComp::new(None),
queue_entries: RefCell::new(HashMap::new())
});
Result::Ok(Transact {
transact_id: transact_id,
value: transact.value
})
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Take {
transact_id: Grc<TransactId>
}
impl Process for Take {
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Take { transact_id } = self;
match transact_id.process_id.read_at(p) {
Some(_) => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
None => {
let priority = transact_id.priority.read_at(p);
let cont = substitute_process_priority(priority, cont);
transact_id.process_id.write_at(Some(pid.clone()), p);
let n = transact_id.preemption_count.read_at(p);
if n == 0 {
resume_process_boxed(cont, pid, (), p)
} else {
let comp = TransactId::take(transact_id.clone());
match FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(c) => {
transact_id.process_cont.write_at(Some(c), p);
for _ in 0 .. n {
match pid.begin_preemption_at(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
Result::Ok(())
}
}
}
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Take { transact_id } = self;
match transact_id.process_id.read_at(p) {
Some(_) => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
None => {
let priority = transact_id.priority.read_at(p);
let cont = substitute_process_priority_boxed(priority, cont);
transact_id.process_id.write_at(Some(pid.clone()), p);
let n = transact_id.preemption_count.read_at(p);
if n == 0 {
resume_process_boxed(cont, pid, (), p)
} else {
let comp = TransactId::take(transact_id.clone());
match FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(c) => {
transact_id.process_cont.write_at(Some(c), p);
for _ in 0 .. n {
match pid.begin_preemption_at(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
Result::Ok(())
}
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Release {
transact_id: Grc<TransactId>
}
impl Process for Release {
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Grc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Release { transact_id } = self;
match transact_id.process_id.read_at(p) {
None => {
let msg = String::from("The transact is not acquired by any process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref pid0) if *pid0 != pid => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(_) => {
transact_id.process_id.write_at(None, p);
transact_id.process_cont.write_at(None, p);
resume_process(cont, pid, (), p)
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Grc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Release { transact_id } = self;
match transact_id.process_id.read_at(p) {
None => {
let msg = String::from("The transact is not acquired by any process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref pid0) if *pid0 != pid => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(_) => {
transact_id.process_id.write_at(None, p);
transact_id.process_cont.write_at(None, p);
resume_process_boxed(cont, pid, (), p)
}
}
}
}