use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap;
use dvcompute::simulation;
use dvcompute::simulation::Run;
use dvcompute::simulation::Point;
use dvcompute::simulation::error::*;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::process::*;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute_utils::simulation::arrival::*;
use crate::simulation::queue::*;
use crate::simulation::assembly::*;
pub struct Transact<T> {
pub sequence_no: u64,
val: RefComp<T>,
pub arrival_delay: Option<f64>,
pub arrival_time: f64,
priority: RefComp<isize>,
assembly_set: Rc<RefComp<Option<Rc<AssemblySet>>>>,
preemption_count: RefComp<isize>,
process_id: RefComp<Option<Rc<ProcessId>>>,
process_cont: RefComp<Option<FrozenProcess<()>>>,
queue_entries: RefCell<HashMap<Rc<Queue>, QueueEntry>>
}
impl<T> PartialEq for Transact<T> {
fn eq(&self, other: &Self) -> bool {
self.process_id == other.process_id
}
}
impl<T> Eq for Transact<T> {}
impl<T> Transact<T> {
#[inline]
pub fn new(arrival: Rc<Arrival<T>>, priority: isize) -> NewTransact<T> where T: Clone {
NewTransact { arrival: arrival, priority: priority }
}
#[inline]
pub fn split(transact: Rc<Transact<T>>) -> Split<T> where T: Clone {
Split { transact: transact }
}
#[inline]
pub fn val_at(&self, p: &Point) -> T where T: Clone {
self.val.read_at(p)
}
#[inline]
pub fn assign_val_at(&self, val: T, p: &Point) {
self.val.write_at(val, p);
}
#[inline]
pub fn val(transact: Rc<Transact<T>>) -> impl Event<Item = T> where T: Clone {
cons_event(move |p| {
Result::Ok(transact.val_at(p))
})
}
#[inline]
pub fn assign_val(transact: Rc<Transact<T>>, val: T) -> impl Event<Item = ()> {
cons_event(move |p| {
Result::Ok(transact.assign_val_at(val, p))
})
}
#[inline]
pub fn priority_at(&self, p: &Point) -> isize {
self.priority.read_at(p)
}
#[inline]
pub fn priority(transact: Rc<Transact<T>>) -> impl Event<Item = isize> {
cons_event(move |p| {
Result::Ok(transact.priority_at(p))
})
}
#[inline]
pub fn assign_priority(transact: Rc<Transact<T>>, priority: isize) -> impl Process<Item = ()>
where T: 'static
{
cons_event(move |p| {
transact.priority.write_at(priority, p);
Result::Ok(())
})
.into_process()
.flat_map(move |()| {
process_with_priority(priority)
})
}
pub fn register_queue_entry(&self, entry: QueueEntry, _p: &Point) -> simulation::Result<()> {
let mut entries = self.queue_entries.borrow_mut();
if entries.contains_key(&entry.queue) {
let msg = String::from("There is another entry for the specified queue");
Result::Err(Error::retry(msg))
} else {
let _ = entries.insert(entry.queue.clone(), entry);
Result::Ok(())
}
}
pub fn unregister_queue_entry(&self, queue: &Queue, _p: &Point) -> simulation::Result<QueueEntry> {
let mut entries = self.queue_entries.borrow_mut();
match entries.remove(queue) {
None => {
let msg = String::from("There must be entry for the specified queue");
Result::Err(Error::retry(msg))
},
Some(entry) => {
Result::Ok(entry)
}
}
}
pub fn assembly_set(&self, p: &Point) -> simulation::Result<Rc<AssemblySet>> {
match self.assembly_set.read_at(p) {
Some(a) => Result::Ok(a),
None => {
match AssemblySet::new().call_simulation(&p.run) {
Result::Err(e) => Result::Err(e),
Result::Ok(a) => {
let a = Rc::new(a);
self.assembly_set.write_at(Some(a.clone()), p);
Result::Ok(a)
}
}
}
}
}
pub fn take(transact: Rc<Self>) -> Take<T> where T: 'static {
Take { transact: transact }
}
pub fn release(transact: Rc<Self>) -> Release<T> where T: 'static {
Release { transact: transact }
}
#[doc(hidden)]
pub fn begin_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let n = self.preemption_count.read_at(p);
self.preemption_count.write_at(1 + n, p);
match self.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => pid.begin_preemption_at(p)
}
}
#[doc(hidden)]
pub fn end_preemption_at(&self, p: &Point) -> simulation::Result<()> {
let n = self.preemption_count.read_at(p);
if n <= 0 {
let msg = String::from("The transact preemption count cannot be negative");
let err = Error::retry(msg);
Result::Err(err)
} else {
self.preemption_count.write_at(n - 1, p);
match self.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => {
match pid.end_preemption_at(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
match self.process_cont.swap_at(None, p) {
None => Result::Ok(()),
Some(cont) => {
match cont.unfreeze(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(None) => Result::Ok(()),
Result::Ok(Some(cont)) => {
enqueue_event(p.time, {
cons_event(move |p| {
resume_process_boxed(cont, pid, (), p)
}).into_boxed()
}).call_event(p)
}
}
}
}
}
}
}
}
}
}
#[inline]
pub fn begin_preemption(transact: Rc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| transact.begin_preemption_at(p))
}
#[inline]
pub fn end_preemption(transact: Rc<Self>) -> impl Event<Item = ()> + Clone {
cons_event(move |p| transact.end_preemption_at(p))
}
pub fn require_process_id(&self, p: &Point) -> simulation::Result<Rc<ProcessId>> {
match self.process_id.read_at(p) {
Some(pid) => Result::Ok(pid),
None => {
let msg = String::from("The transact must be associated with some process");
let err = Error::retry(msg);
Result::Err(err)
}
}
}
pub fn transfer<M>(transact: Rc<Self>, comp: M, p: &Point) -> simulation::Result<()>
where M: Process<Item = ()> + 'static,
T: 'static
{
let result = {
match transact.process_id.read_at(p) {
None => Result::Ok(()),
Some(pid) => pid.initiate_cancel_at(p)
}
};
match result {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
transact.process_id.write_at(None, p);
transact.process_cont.write_at(None, p);
Transact::take(transact)
.flat_map(move |()| {
transfer_process(comp)
})
.run()
.call_event(p)
}
}
}
pub fn reactivate<I, M>(iter: I, p: &Point) -> simulation::Result<()>
where I: IntoIterator<Item = (Rc<Self>, Option<M>)>,
M: Process<Item = ()> + 'static,
T: 'static
{
let mut iter = iter.into_iter();
while let Some((transact, comp)) = iter.next() {
match comp {
None => {
match transact.require_process_id(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(pid) => {
match ProcessId::reactivate(pid).call_event(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
}
},
Some(comp) => {
match Transact::transfer(transact, comp, p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
}
}
Result::Ok(())
}
}
#[derive(Clone)]
pub struct NewTransact<T> {
arrival: Rc<Arrival<T>>,
priority: isize
}
impl<T> Simulation for NewTransact<T>
where T: Clone
{
type Item = Transact<T>;
#[doc(hidden)]
#[inline]
fn call_simulation(self, r: &Run) -> simulation::Result<Self::Item> {
let NewTransact { arrival, priority } = self;
let gen = &r.generator;
let sequence_no = gen.random_sequence_no();
Result::Ok(Transact {
sequence_no: sequence_no,
val: RefComp::new(arrival.val.clone()),
arrival_delay: arrival.delay,
arrival_time: arrival.time,
priority: RefComp::new(priority),
assembly_set: Rc::new(RefComp::new(None)),
preemption_count: RefComp::new(0),
process_id: RefComp::new(None),
process_cont: RefComp::new(None),
queue_entries: RefCell::new(HashMap::new())
})
}
}
#[derive(Clone)]
pub struct Split<T> {
transact: Rc<Transact<T>>
}
impl<T> Simulation for Split<T>
where T: Clone
{
type Item = Transact<T>;
#[doc(hidden)]
#[inline]
fn call_simulation(self, r: &Run) -> simulation::Result<Self::Item> {
let Split { transact } = self;
let gen = &r.generator;
let sequence_no = gen.random_sequence_no();
Result::Ok(Transact {
sequence_no: sequence_no,
val: transact.val.clone(),
arrival_delay: transact.arrival_delay,
arrival_time: transact.arrival_time,
priority: transact.priority.clone(),
assembly_set: transact.assembly_set.clone(),
preemption_count: RefComp::new(0),
process_id: RefComp::new(None),
process_cont: RefComp::new(None),
queue_entries: RefCell::new(HashMap::new())
})
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Take<T>
{
transact: Rc<Transact<T>>
}
impl<T> Process for Take<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Take { transact } = self;
match transact.process_id.read_at(p) {
Some(_) => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
None => {
let priority = transact.priority.read_at(p);
let cont = substitute_process_priority(priority, cont);
transact.process_id.write_at(Some(pid.clone()), p);
let n = transact.preemption_count.read_at(p);
if n == 0 {
resume_process_boxed(cont, pid, (), p)
} else {
let comp = Transact::take(transact.clone());
match FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(c) => {
transact.process_cont.write_at(Some(c), p);
for _ in 0 .. n {
match pid.begin_preemption_at(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
Result::Ok(())
}
}
}
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Take { transact } = self;
match transact.process_id.read_at(p) {
Some(_) => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
None => {
let priority = transact.priority.read_at(p);
let cont = substitute_process_priority_boxed(priority, cont);
transact.process_id.write_at(Some(pid.clone()), p);
let n = transact.preemption_count.read_at(p);
if n == 0 {
resume_process_boxed(cont, pid, (), p)
} else {
let comp = Transact::take(transact.clone());
match FrozenProcess::with_reentering(cont, pid.clone(), (), comp, p) {
Result::Err(e) => Result::Err(e),
Result::Ok(c) => {
transact.process_cont.write_at(Some(c), p);
for _ in 0 .. n {
match pid.begin_preemption_at(p) {
Result::Err(e) => return Result::Err(e),
Result::Ok(()) => {}
}
}
Result::Ok(())
}
}
}
}
}
}
}
#[must_use = "computations are lazy and do nothing unless to be run"]
#[derive(Clone)]
pub struct Release<T>
{
transact: Rc<Transact<T>>
}
impl<T> Process for Release<T>
where T: 'static
{
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
let Release { transact } = self;
match transact.process_id.read_at(p) {
None => {
let msg = String::from("The transact is not acquired by any process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref pid0) if *pid0 != pid => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(_) => {
transact.process_id.write_at(None, p);
transact.process_cont.write_at(None, p);
resume_process(cont, pid, (), p)
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
let Release { transact } = self;
match transact.process_id.read_at(p) {
None => {
let msg = String::from("The transact is not acquired by any process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(ref pid0) if *pid0 != pid => {
let msg = String::from("The transact is acquired by another process");
let err = Error::retry(msg);
Result::Err(err)
},
Some(_) => {
transact.process_id.write_at(None, p);
transact.process_cont.write_at(None, p);
resume_process_boxed(cont, pid, (), p)
}
}
}
}