use std::rc::Rc;
use std::hash::{Hash, Hasher};
use dvcompute::simulation;
use dvcompute::simulation::ref_comp::RefComp;
use dvcompute::simulation::Run;
use dvcompute::simulation::Point;
use dvcompute::simulation::error::*;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::process::*;
use dvcompute::simulation::strategy::QueueStorage;
use crate::simulation::transact::*;
use crate::simulation::strategy::*;
#[inline]
pub fn assemble_transact<T>(transact: Rc<Transact<T>>, count: isize) -> AssembleTransact<T> {
AssembleTransact { transact: transact, count: count }
}
#[inline]
pub fn gather_transacts<T>(transact: Rc<Transact<T>>, count: isize) -> GatherTransacts<T> {
GatherTransacts { transact: transact, count: count }
}
#[inline]
pub fn is_transact_assembling<T>(transact: Rc<Transact<T>>) -> IsTransactAssembling<T> {
IsTransactAssembling { transact: transact }
}
#[inline]
pub fn is_transact_gathering<T>(transact: Rc<Transact<T>>) -> IsTransactGathering<T> {
IsTransactGathering { transact: transact }
}
pub struct AssemblySet {
sequence_no: u64,
assembling_transact: RefComp<Option<Rc<ProcessId>>>,
assembling_count: RefComp<isize>,
gathering_transacts: FCFSStorage<Rc<ProcessId>>,
gathering_count: RefComp<isize>
}
impl PartialEq for AssemblySet {
fn eq(&self, other: &Self) -> bool {
self.assembling_transact == other.assembling_transact
}
}
impl Eq for AssemblySet {}
impl Hash for AssemblySet {
fn hash<H: Hasher>(&self, state: &mut H) {
self.sequence_no.hash(state)
}
}
impl AssemblySet {
#[inline]
pub fn new() -> NewAssemblySet {
NewAssemblySet {}
}
}
#[derive(Clone)]
pub struct NewAssemblySet {}
impl Simulation for NewAssemblySet {
type Item = AssemblySet;
#[doc(hidden)]
fn call_simulation(self, r: &Run) -> simulation::Result<Self::Item> {
let gen = &r.generator;
let sequence_no = gen.random_sequence_no();
Result::Ok(AssemblySet {
sequence_no: sequence_no,
assembling_transact: RefComp::new(None),
assembling_count: RefComp::new(0),
gathering_transacts: FCFSStorage::new(),
gathering_count: RefComp::new(0)
})
}
}
#[derive(Clone)]
pub struct AssembleTransact<T> {
transact: Rc<Transact<T>>,
count: isize
}
impl<T> Process for AssembleTransact<T> {
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let AssembleTransact { transact, count } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.assembling_count.read_at(p);
if a == 0 {
let count = count - 1;
if count < 0 {
let msg = String::from("The number of transacts must be positive");
let err = OtherError::retry(msg);
cut_error_process(cont, pid, err, p)
} else if count == 0 {
Result::Ok(())
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
s.assembling_transact.write_at(Some(pid0), p);
s.assembling_count.write_at(count, p);
passivate_process().call_process(cont, pid, p)
}
}
}
} else {
let a = a - 1;
if a == 0 {
let pid0 = s.assembling_transact.swap_at(None, p).unwrap();
s.assembling_count.write_at(a, p);
match ProcessId::reactivate_immediately(pid0).call_event(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
cancel_process().call_process(cont, pid, p)
}
}
} else {
s.assembling_count.write_at(a, p);
cancel_process().call_process(cont, pid, p)
}
}
}
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let AssembleTransact { transact, count } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.assembling_count.read_at(p);
if a == 0 {
let count = count - 1;
if count < 0 {
let msg = String::from("The number of transacts must be positive");
let err = OtherError::retry(msg);
cut_error_process_boxed(cont, pid, err, p)
} else if count == 0 {
Result::Ok(())
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
s.assembling_transact.write_at(Some(pid0), p);
s.assembling_count.write_at(count, p);
passivate_process().call_process_boxed(cont, pid, p)
}
}
}
} else {
let a = a - 1;
if a == 0 {
let pid0 = s.assembling_transact.swap_at(None, p).unwrap();
s.assembling_count.write_at(a, p);
match ProcessId::reactivate_immediately(pid0).call_event(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(()) => {
cancel_process().call_process_boxed(cont, pid, p)
}
}
} else {
s.assembling_count.write_at(a, p);
cancel_process().call_process_boxed(cont, pid, p)
}
}
}
}
}
}
}
#[derive(Clone)]
pub struct GatherTransacts<T> {
transact: Rc<Transact<T>>,
count: isize
}
impl<T> Process for GatherTransacts<T> {
type Item = ();
#[doc(hidden)]
fn call_process<C>(self, cont: C, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()>
where C: FnOnce(simulation::Result<Self::Item>, Rc<ProcessId>, &Point) -> simulation::Result<()> + 'static
{
if is_process_cancelled(&pid, p) {
revoke_process(cont, pid, p)
} else {
let GatherTransacts { transact, count } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.gathering_count.read_at(p);
if a == 0 {
let count = count - 1;
if count < 0 {
let msg = String::from("The number of transacts must be positive");
let err = OtherError::retry(msg);
cut_error_process(cont, pid, err, p)
} else if count == 0 {
Result::Ok(())
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
let priority = Transact::priority_at(&transact, p);
s.gathering_transacts.push_with_priority(priority, pid0, p);
s.gathering_count.write_at(count, p);
passivate_process().call_process(cont, pid, p)
}
}
}
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
let a = a - 1;
let priority = Transact::priority_at(&transact, p);
s.gathering_transacts.push_with_priority(priority, pid0, p);
s.gathering_count.write_at(a, p);
if a == 0 {
let comp = cons_event(move |p| {
let capacity = if count > 0 { count as usize } else { 1 };
let mut pids = Vec::with_capacity(capacity);
while let Some(pid0) = s.gathering_transacts.pop(p) {
pids.push(pid0);
}
let pids = pids.into_iter();
ProcessId::reactivate_many_immediately(pids).call_event(p)
});
passivate_process_before(comp).call_process(cont, pid, p)
} else {
passivate_process().call_process(cont, pid, p)
}
}
}
}
}
}
}
}
#[doc(hidden)]
fn call_process_boxed(self, cont: ProcessBoxCont<Self::Item>, pid: Rc<ProcessId>, p: &Point) -> simulation::Result<()> {
if is_process_cancelled(&pid, p) {
revoke_process_boxed(cont, pid, p)
} else {
let GatherTransacts { transact, count } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.gathering_count.read_at(p);
if a == 0 {
let count = count - 1;
if count < 0 {
let msg = String::from("The number of transacts must be positive");
let err = OtherError::retry(msg);
cut_error_process_boxed(cont, pid, err, p)
} else if count == 0 {
Result::Ok(())
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
let priority = Transact::priority_at(&transact, p);
s.gathering_transacts.push_with_priority(priority, pid0, p);
s.gathering_count.write_at(count, p);
passivate_process().call_process_boxed(cont, pid, p)
}
}
}
} else {
match transact.require_process_id(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(pid0) => {
let a = a - 1;
let priority = Transact::priority_at(&transact, p);
s.gathering_transacts.push_with_priority(priority, pid0, p);
s.gathering_count.write_at(a, p);
if a == 0 {
let comp = cons_event(move |p| {
let capacity = if count > 0 { count as usize } else { 1 };
let mut pids = Vec::with_capacity(capacity);
while let Some(pid0) = s.gathering_transacts.pop(p) {
pids.push(pid0);
}
let pids = pids.into_iter();
ProcessId::reactivate_many_immediately(pids).call_event(p)
});
passivate_process_before(comp).call_process_boxed(cont, pid, p)
} else {
passivate_process().call_process_boxed(cont, pid, p)
}
}
}
}
}
}
}
}
}
#[derive(Clone)]
pub struct IsTransactAssembling<T> {
transact: Rc<Transact<T>>
}
impl<T> Event for IsTransactAssembling<T> {
type Item = bool;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let IsTransactAssembling { transact } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.assembling_count.read_at(p);
Result::Ok(a > 0)
}
}
}
}
#[derive(Clone)]
pub struct IsTransactGathering<T> {
transact: Rc<Transact<T>>
}
impl<T> Event for IsTransactGathering<T> {
type Item = bool;
#[doc(hidden)]
#[inline]
fn call_event(self, p: &Point) -> simulation::Result<Self::Item> {
let IsTransactGathering { transact } = self;
match transact.assembly_set(p) {
Result::Err(e) => Result::Err(e),
Result::Ok(s) => {
let a = s.gathering_count.read_at(p);
Result::Ok(a > 0)
}
}
}
}