use std::sync::Arc;
use std::default::Default;
use std::result;
#[cfg(feature="seq_mode")]
use std::io;
#[cfg(feature="seq_mode")]
use std::io::*;
use dvcompute::simulation;
use dvcompute::simulation::*;
use dvcompute::simulation::generator::*;
use dvcompute::simulation::composite::*;
use dvcompute::simulation::simulation::*;
use dvcompute::simulation::event::*;
use dvcompute::simulation::observable::disposable::*;
use dvcompute_results::simulation::results;
use dvcompute_results::simulation::results::*;
use dvcompute_results::simulation::results::locale::*;
use dvcompute_utils::grc::GrcStorage;
#[cfg(feature="seq_mode")]
use rayon::prelude::*;
#[cfg(feature="cons_mode")]
use dvcompute::simulation::comm::context::*;
#[cfg(feature="cons_mode")]
use dvcompute::simulation::comm::pid::*;
#[cfg(feature="cons_mode")]
use dvcompute::simulation::comm::lp::*;
#[cfg(feature="cons_mode")]
use dvcompute_network::network::*;
pub mod rendering;
pub mod view;
#[derive(Clone)]
pub struct Experiment {
pub specs: Specs,
pub transform: Arc<dyn Fn() -> ResultTransform + Sync + Send>,
pub locale: ResultLocale,
pub run_count: usize,
pub title: String,
pub description: Option<String>,
pub verbose: bool,
pub num_threads: Option<usize>
}
impl Default for Experiment {
fn default() -> Self {
Self {
specs: Specs {
start_time: 0.0,
stop_time: 10.0,
dt: 0.01,
generator_type: GeneratorType::Simple
},
transform: Arc::new(|| { ResultTransform::new(|x| { results::Result::Ok(x.clone()) }) }),
locale: ResultLocale::En,
run_count: 1,
title: String::from("Simulation Experiment"),
description: None,
verbose: true,
num_threads: None
}
}
}
impl Experiment {
#[cfg(any(feature="seq_mode", feature="wasm_mode"))]
pub fn run<I, R, F, E, M>(&self,
generators: I,
rendering: R,
simulation: F,
executor: E) -> simulation::Result<()>
where
I: IntoIterator<Item = Box<dyn ExperimentGenerator<R>>>,
R: ExperimentRendering + Send + 'static,
F: FnOnce() -> M + Sync + Send + Clone + 'static,
M: Simulation<Item = ResultSet> + 'static,
E: ExperimentExecutor
{
let specs = self.specs.clone();
let launcher = move |comp: SimulationBox<()>, run_index, run_count| {
comp.run_by_index(specs, run_index, run_count)
};
self.run_with_executor_and_launcher(generators, rendering, simulation, executor, launcher)
}
#[cfg(feature="cons_mode")]
pub fn run<'a, I, R, F, E>(&self,
generators: I,
rendering: R,
simulation: F,
executor: E) -> simulation::Result<()>
where
I: IntoIterator<Item = Box<dyn ExperimentGenerator<R>>>,
R: ExperimentRendering + Send + 'static,
F: FnOnce(&LogicalProcessContext, ExperimentCont) -> simulation::Result<()> + Sync + Send + Clone + 'static,
E: ExperimentExecutor
{
let specs = self.specs.clone();
let launcher = move |ctx: &LogicalProcessContext, comp: SimulationBox<()>, run_index, run_count| {
comp.run_by_index(specs, ctx, run_index, run_count)
};
self.run_with_executor_and_launcher(generators, rendering, simulation, executor, launcher)
}
#[cfg(any(feature="seq_mode", feature="wasm_mode"))]
fn run_with_executor_and_launcher<I, R, F, E, L, M>(&self,
generators: I,
rendering: R,
simulation: F,
executor: E,
launcher: L) -> simulation::Result<()>
where
I: IntoIterator<Item = Box<dyn ExperimentGenerator<R>>>,
R: ExperimentRendering + Send + 'static,
F: FnOnce() -> M + Sync + Send + Clone + 'static,
M: Simulation<Item = ResultSet> + 'static,
E: ExperimentExecutor,
L: FnOnce(SimulationBox<()>, usize, usize) -> simulation::Result<()> + Sync + Send + Clone + 'static
{
let env = rendering.prepare(self)?;
let reporters: Vec<_> = generators.into_iter()
.map(|x| { x.report(&rendering, self, &env) })
.collect();
let reporters = Arc::new(reporters);
for reporter in reporters.iter() {
reporter.initialise()?;
}
let simulate: Vec<_> = (0 .. self.run_count)
.map(|run_index| {
let run_count = self.run_count;
let simulation = simulation.clone();
let launcher = launcher.clone();
let reporters = reporters.clone();
let f: Box<dyn Fn() -> simulation::Result<()> + Sync + Send> = {
Box::new(move || {
let simulation = simulation.clone();
let launcher = launcher.clone();
let reporters = reporters.clone();
let comp = {
ResultPredefinedObservableSet::new()
.and_then(move |predefined_observables| {
simulation().and_then(move |results| {
let d = ExperimentData { results, predefined_observables };
let comps: Vec<_> = reporters.iter()
.map(|x| { x.simulate(&d) })
.collect();
composite_sequence_(comps)
.run(empty_disposable())
.run_in_start_time_by(false)
.and_then(move |((), fs)| {
return_event(())
.run_in_stop_time()
.finally({
fs.into_event()
.run_in_stop_time()
})
})
})
})
.into_boxed()
};
launcher(comp, run_index, run_count)
})
};
f
})
.collect();
let x = executor.execute(simulate);
for reporter in reporters.iter() {
reporter.finalise()?;
}
match x {
result::Result::Ok(a) => {
rendering.render(&self, &reporters, &env)?;
rendering.on_completed(&self, &env)?;
result::Result::Ok(a)
},
result::Result::Err(e) => {
rendering.on_failed(&self, &env, &e)?;
result::Result::Err(e)
}
}
}
#[cfg(feature="cons_mode")]
fn run_with_executor_and_launcher<I, R, F, E, L>(&self,
generators: I,
rendering: R,
simulation: F,
executor: E,
launcher: L) -> simulation::Result<()>
where
I: IntoIterator<Item = Box<dyn ExperimentGenerator<R>>>,
R: ExperimentRendering + Send + 'static,
F: FnOnce(&LogicalProcessContext, ExperimentCont) -> simulation::Result<()> + Sync + Send + Clone + 'static,
E: ExperimentExecutor,
L: FnOnce(&LogicalProcessContext, SimulationBox<()>, usize, usize) -> simulation::Result<()> + Sync + Send + Clone + 'static
{
let env = rendering.prepare(self)?;
let reporters: Vec<_> = generators.into_iter()
.map(|x| { x.report(&rendering, self, &env) })
.collect();
let reporters = Arc::new(reporters);
for reporter in reporters.iter() {
reporter.initialise()?;
}
let simulate: Vec<_> = (0 .. self.run_count)
.map(|run_index| {
let run_count = self.run_count;
let simulation = simulation.clone();
let launcher = launcher.clone();
let reporters = reporters.clone();
let f: Box<dyn Fn(&LogicalProcessContext) -> simulation::Result<()> + Sync + Send> = {
Box::new(move |ctx| {
let simulation = simulation.clone();
let launcher = launcher.clone();
let reporters = reporters.clone();
simulation(ctx, Box::new(move |ctx, results| {
let comp = {
results
.and_then(move |results| {
ResultPredefinedObservableSet::new()
.and_then(move |predefined_observables| {
let d = ExperimentData { results, predefined_observables };
let comps: Vec<_> = reporters.iter()
.map(|x| { x.simulate(&d) })
.collect();
composite_sequence_(comps)
.run(empty_disposable())
.run_in_start_time_by(false)
.and_then(move |((), fs)| {
return_event(())
.run_in_stop_time()
.finally({
fs.into_event()
.run_in_stop_time()
})
})
})
.into_boxed()
})
.into_boxed()
};
launcher(ctx, comp, run_index, run_count)
}))
})
};
f
})
.collect();
let x = executor.execute(simulate);
for reporter in reporters.iter() {
reporter.finalise()?;
}
match x {
result::Result::Ok(a) => {
rendering.render(&self, &reporters, &env)?;
rendering.on_completed(&self, &env)?;
result::Result::Ok(a)
},
result::Result::Err(e) => {
rendering.on_failed(&self, &env, &e)?;
result::Result::Err(e)
}
}
}
}
#[cfg(feature="cons_mode")]
pub type ExperimentCont = Box<dyn FnOnce(&LogicalProcessContext, SimulationBox<ResultSet>) -> simulation::Result<()>>;
pub trait ExperimentRendering {
type ExperimentContext: Send;
type ExperimentEnvironment: Send;
fn prepare(&self, experiment: &Experiment) -> simulation::Result<Self::ExperimentEnvironment>;
fn render(&self, experiment: &Experiment, reporters: &Vec<Box<dyn ExperimentReporter<Self> + Sync + Send>>, env: &Self::ExperimentEnvironment) -> simulation::Result<()>
where Self: Sized;
fn on_completed(&self, experiment: &Experiment, env: &Self::ExperimentEnvironment) -> simulation::Result<()>;
fn on_failed(&self, experiment: &Experiment, env: &Self::ExperimentEnvironment, err: &simulation::error::Error) -> simulation::Result<()>;
}
pub trait ExperimentView<R: ExperimentRendering> {
fn view(&self) -> Box<dyn ExperimentGenerator<R>>;
}
pub trait ExperimentGenerator<R: ExperimentRendering> {
fn report(&self, rendering: &R, experiment: &Experiment, env: &R::ExperimentEnvironment) -> Box<dyn ExperimentReporter<R> + Sync + Send>;
}
#[derive(Clone)]
pub struct ExperimentData {
pub results: ResultSet,
pub predefined_observables: ResultPredefinedObservableSet
}
pub trait ExperimentReporter<R: ExperimentRendering> {
fn initialise(&self) -> simulation::Result<()>;
fn finalise(&self) -> simulation::Result<()>;
fn simulate(&self, xs: &ExperimentData) -> CompositeBox<()>;
fn context(&self) -> R::ExperimentContext;
}
pub trait ExperimentExecutor {
#[cfg(any(feature="seq_mode", feature="wasm_mode"))]
fn execute(self, models: Vec<Box<dyn Fn() -> simulation::Result<()> + Sync + Send>>) -> simulation::Result<()>;
#[cfg(feature="cons_mode")]
fn execute(self, models: Vec<Box<dyn Fn(&LogicalProcessContext) -> simulation::Result<()> + Sync + Send>>) -> simulation::Result<()>;
}
#[cfg(feature="seq_mode")]
pub enum BasicExperimentExecutor {
Seq,
Par
}
#[cfg(feature="wasm_mode")]
pub enum BasicExperimentExecutor {
Seq
}
#[cfg(feature="wasm_mode")]
pub enum BasicExperimentExecutor {
Seq
}
#[cfg(feature="seq_mode")]
impl Default for BasicExperimentExecutor {
fn default() -> Self {
BasicExperimentExecutor::Par
}
}
#[cfg(feature="wasm_mode")]
impl Default for BasicExperimentExecutor {
fn default() -> Self {
BasicExperimentExecutor::Seq
}
}
#[cfg(feature="wasm_mode")]
impl Default for BasicExperimentExecutor {
fn default() -> Self {
BasicExperimentExecutor::Seq
}
}
#[cfg(feature="seq_mode")]
impl ExperimentExecutor for BasicExperimentExecutor {
fn execute(self, models: Vec<Box<dyn Fn() -> simulation::Result<()> + Sync + Send>>) -> simulation::Result<()> {
match self {
BasicExperimentExecutor::Seq => {
for x in models {
let y = x();
GrcStorage::free_thread_local();
match y {
result::Result::Ok(()) => continue,
result::Result::Err(e) => return result::Result::Err(e)
}
}
result::Result::Ok(())
},
BasicExperimentExecutor::Par => {
models.par_iter()
.map(|x| {
match x() {
result::Result::Ok(()) => {},
result::Result::Err(e) => {
let _ = writeln!(io::stderr(), "Error: {:?}", e);
}
}
GrcStorage::free_thread_local();
0
})
.sum::<usize>();
result::Result::Ok(())
}
}
}
}
#[cfg(feature="wasm_mode")]
impl ExperimentExecutor for BasicExperimentExecutor {
fn execute(self, models: Vec<Box<dyn Fn() -> simulation::Result<()> + Sync + Send>>) -> simulation::Result<()> {
match self {
BasicExperimentExecutor::Seq => {
for x in models {
let y = x();
GrcStorage::free_thread_local();
match y {
result::Result::Ok(()) => continue,
result::Result::Err(e) => return result::Result::Err(e)
}
}
result::Result::Ok(())
}
}
}
}
#[cfg(feature="cons_mode")]
pub struct LogicalProcessExecutor {
pub network: Box<dyn Fn(usize) -> NetworkSupport>,
pub pids: Vec<LogicalProcessId>,
pub ps: LogicalProcessParameters
}
#[cfg(feature="cons_mode")]
impl ExperimentExecutor for LogicalProcessExecutor {
fn execute(self, models: Vec<Box<dyn Fn(&LogicalProcessContext) -> simulation::Result<()> + Sync + Send>>) -> simulation::Result<()> {
for (x, run_index) in models.into_iter().zip(0..) {
let x = move |ctx: &LogicalProcessContext| {
match x(ctx) {
result::Result::Ok(()) => {},
result::Result::Err(e) => {
log::error!("{:?}", e);
}
}
GrcStorage::free_thread_local();
};
{
let mut network = (self.network)(run_index);
let pids = &self.pids;
let ps = self.ps.clone();
network.barrier();
LogicalProcess::run(network, pids, ps, x);
}
}
result::Result::Ok(())
}
}