use crate::{
checked_rpc,
common::{
error::{err, inv_arg, inv_op, Result},
log::thread::LogThread,
protocol::{FrontendRunRequest, PluginToSimulator},
types::{ArbCmd, ArbData, PluginMetadata},
},
debug, error, fatal,
host::{
accelerator::Accelerator,
configuration::Seed,
plugin::Plugin,
reproduction::{HostCall, Reproduction},
},
info, trace,
};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
};
use std::collections::VecDeque;
use std::path::Path;
pub type Pipeline = Vec<Box<dyn Plugin>>;
#[derive(Debug)]
struct InitializedPlugin {
pub plugin: Box<dyn Plugin>,
pub metadata: PluginMetadata,
}
#[derive(Debug, PartialEq)]
enum AcceleratorState {
Idle,
StartPending(ArbData),
Blocked,
WaitPending(ArbData),
}
impl AcceleratorState {
pub fn is_idle(&self) -> bool {
&AcceleratorState::Idle == self
}
pub fn is_start_pending(&self) -> bool {
matches!(self, AcceleratorState::StartPending(_))
}
pub fn is_blocked(&self) -> bool {
&AcceleratorState::Blocked == self
}
pub fn is_wait_pending(&self) -> bool {
matches!(self, AcceleratorState::WaitPending(_))
}
fn data(self) -> ArbData {
match self {
AcceleratorState::StartPending(x) => x,
AcceleratorState::WaitPending(x) => x,
_ => panic!("no data pending"),
}
}
pub fn put_data(&mut self, data: ArbData) -> Result<()> {
match self {
AcceleratorState::Idle => {
let _ = std::mem::replace(self, AcceleratorState::StartPending(data));
Ok(())
}
AcceleratorState::StartPending(_) => inv_op("data is already pending"),
AcceleratorState::Blocked => {
let _ = std::mem::replace(self, AcceleratorState::WaitPending(data));
Ok(())
}
AcceleratorState::WaitPending(_) => inv_op("data is already pending"),
}
}
pub fn take_data(&mut self) -> Result<ArbData> {
match self {
AcceleratorState::Idle => inv_op("no data pending"),
AcceleratorState::StartPending(_) => {
Ok(std::mem::replace(self, AcceleratorState::Blocked).data())
}
AcceleratorState::Blocked => inv_op("no data pending"),
AcceleratorState::WaitPending(_) => {
Ok(std::mem::replace(self, AcceleratorState::Idle).data())
}
}
}
}
#[derive(Debug)]
pub struct Simulation {
pipeline: Vec<InitializedPlugin>,
state: AcceleratorState,
host_to_accelerator_data: VecDeque<ArbData>,
accelerator_to_host_data: VecDeque<ArbData>,
reproduction_log: Option<Reproduction>,
}
impl Simulation {
pub fn new(
mut pipeline: Pipeline,
seed: Seed,
reproduction_log: Option<Reproduction>,
logger: &LogThread,
) -> Result<Simulation> {
trace!("Constructing Simulation");
if pipeline.len() < 2 {
inv_arg("Simulation must consist of at least a frontend and backend")?
}
info!("Starting Simulation with seed: {}", seed);
let (_, errors): (_, Vec<Result<()>>) = pipeline
.iter_mut()
.map(|plugin| plugin.spawn(logger))
.partition(Result::is_ok);
if !errors.is_empty() {
for error in errors {
fatal!("{}", error.as_ref().unwrap_err());
}
err("Failed to spawn plugin(s)")?
}
let mut downstream = None;
let mut metadata = vec![];
let mut rng = ChaChaRng::seed_from_u64(seed.value);
for plugin in pipeline.iter_mut().rev() {
let res = plugin.initialize(logger, &downstream, rng.next_u64())?;
downstream = res.upstream;
metadata.push(res.metadata);
}
for plugin in pipeline.iter_mut().skip(1).rev() {
plugin.accept_upstream()?
}
for plugin in pipeline.iter_mut().rev() {
plugin.user_initialize()?
}
let pipeline: Vec<_> = pipeline
.into_iter()
.zip(metadata.into_iter().rev())
.map(|(plugin, metadata)| InitializedPlugin { plugin, metadata })
.collect();
for (i, p) in pipeline.iter().enumerate() {
debug!(
"Plugin {} with instance name {} is {}",
i,
p.plugin.name(),
p.metadata,
);
}
Ok(Simulation {
pipeline,
state: AcceleratorState::Idle,
host_to_accelerator_data: VecDeque::new(),
accelerator_to_host_data: VecDeque::new(),
reproduction_log,
})
}
pub fn drop_plugins(&mut self) {
trace!("Implicit yield() prior to dropping plugins...");
if let Err(e) = self.internal_yield() {
error!("Implicit yield to frontend failed: {}", e.to_string());
}
trace!("Dropping plugins...");
for p in self.pipeline.drain(..) {
let name = p.plugin.name();
let _ = p;
trace!("Dropped {}...", name);
}
}
#[allow(clippy::borrowed_box)]
pub fn accelerator(&self) -> &Box<dyn Plugin> {
unsafe { &self.pipeline.get_unchecked(0).plugin }
}
#[allow(clippy::borrowed_box)]
pub fn accelerator_mut(&mut self) -> &mut Box<dyn Plugin> {
unsafe { &mut self.pipeline.get_unchecked_mut(0).plugin }
}
fn internal_yield(&mut self) -> Result<()> {
let start = if self.state.is_start_pending() {
Some(self.state.take_data().unwrap())
} else {
None
};
let messages = self.host_to_accelerator_data.drain(..).collect();
let response = checked_rpc!(
self.accelerator_mut(),
FrontendRunRequest {
start,
messages,
},
expect RunResponse
)?;
self.accelerator_to_host_data.extend(response.messages);
if let Some(return_value) = response.return_value {
if !self.state.is_blocked() {
return err("Protocol error: unexpected run() return value");
}
self.state.put_data(return_value).unwrap();
}
Ok(())
}
fn record_host_call(&mut self, host_call: HostCall) {
if let Some(log) = self.reproduction_log.as_mut() {
debug!("recording host call to reproduction log: {:?}", &host_call);
log.record(host_call);
}
}
pub fn yield_to_accelerator(&mut self) -> Result<()> {
self.record_host_call(HostCall::Yield);
self.internal_yield()
}
pub fn arb(&mut self, name: impl AsRef<str>, cmd: impl Into<ArbCmd>) -> Result<ArbData> {
let name = name.as_ref();
for (i, p) in self.pipeline.iter().enumerate() {
if p.plugin.name() == name {
return self.arb_idx(i as isize, cmd);
}
}
inv_arg(format!("plugin {} not found", name))
}
fn convert_plugin_index(&self, index: isize) -> Result<usize> {
let mut conv_index = index;
let n_plugins = self.pipeline.len();
if conv_index < 0 {
conv_index += n_plugins as isize;
if conv_index < 0 {
inv_arg(format!("index {} out of range", index))?
}
}
let conv_index = conv_index as usize;
if conv_index >= n_plugins {
inv_arg(format!("index {} out of range", index))?
}
Ok(conv_index)
}
pub fn arb_idx(&mut self, index: isize, cmd: impl Into<ArbCmd>) -> Result<ArbData> {
let index = self.convert_plugin_index(index)?;
let cmd = cmd.into();
self.record_host_call(HostCall::Arb(
self.pipeline[index].plugin.name(),
cmd.clone(),
));
self.internal_yield()?;
self.pipeline[index].plugin.arb(cmd)
}
pub fn get_metadata(&self, name: impl AsRef<str>) -> Result<&PluginMetadata> {
let name = name.as_ref();
for (i, p) in self.pipeline.iter().enumerate() {
if p.plugin.name() == name {
return self.get_metadata_idx(i as isize);
}
}
inv_arg(format!("plugin {} not found", name))
}
pub fn get_metadata_idx(&self, index: isize) -> Result<&PluginMetadata> {
Ok(&self.pipeline[self.convert_plugin_index(index)?].metadata)
}
pub fn write_reproduction_file(&self, filename: impl AsRef<Path>) -> Result<()> {
if let Some(log) = &self.reproduction_log {
log.to_file(filename)
} else {
inv_op(
"cannot output reproduction file; \
we failed earlier on when attempting to construct the logger.",
)
}
}
}
impl Accelerator for Simulation {
fn start(&mut self, args: impl Into<ArbData>) -> Result<()> {
if self.state.is_idle() {
let args = args.into();
self.record_host_call(HostCall::Start(args.clone()));
self.state.put_data(args).unwrap();
Ok(())
} else {
inv_op("accelerator is already running; call wait() first")
}
}
fn wait(&mut self) -> Result<ArbData> {
if self.state.is_idle() {
inv_op("accelerator is not running; call start() first")
} else {
self.record_host_call(HostCall::Wait);
if self.state.is_wait_pending() {
self.state.take_data()
} else {
self.internal_yield()?;
if self.state.is_wait_pending() {
self.state.take_data()
} else {
err("Deadlock: accelerator is blocked on recv() while we are expecting it to return")
}
}
}
}
fn send(&mut self, args: impl Into<ArbData>) -> Result<()> {
let args = args.into();
self.record_host_call(HostCall::Send(args.clone()));
self.host_to_accelerator_data.push_back(args);
Ok(())
}
fn recv(&mut self) -> Result<ArbData> {
if self.state.is_idle() && self.accelerator_to_host_data.is_empty() {
err("Deadlock: recv() called while queue is empty and accelerator is idle")
} else {
self.record_host_call(HostCall::Recv);
if let Some(data) = self.accelerator_to_host_data.pop_front() {
Ok(data)
} else {
self.internal_yield()?;
if let Some(data) = self.accelerator_to_host_data.pop_front() {
Ok(data)
} else {
err("Deadlock: accelerator exited before sending data")
}
}
}
}
}
impl Drop for Simulation {
fn drop(&mut self) {
trace!("Dropping Simulation");
}
}