use crate::{
common::{
error::{err, inv_arg, inv_op, oe_err, Result},
protocol::{
FrontendRunRequest, FrontendRunResponse, GatestreamDown, GatestreamUp,
PipelinedGatestreamDown, PluginInitializeRequest, PluginInitializeResponse,
PluginToSimulator, SimulatorToPlugin,
},
types::{
ArbCmd, ArbData, Cycle, Cycles, Gate, PluginType, QubitMeasurementResult,
QubitMeasurementValue, QubitRef, QubitRefGenerator, SequenceNumber,
SequenceNumberGenerator,
},
util::friendly_enumerate,
},
debug, error, fatal,
plugin::{
connection::{Connection, IncomingMessage, OutgoingMessage},
definition::PluginDefinition,
log::setup_logging,
},
trace, warn,
};
use rand::{distributions::Standard, Rng};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaChaRng,
};
use std::collections::{HashMap, HashSet, VecDeque};
struct RandomNumberGenerator {
rngs: Vec<ChaChaRng>,
selected: usize,
}
impl RandomNumberGenerator {
pub fn new(num_streams: usize, seed: u64) -> RandomNumberGenerator {
let mut rng = ChaChaRng::seed_from_u64(seed);
let mut rngs = vec![];
for _ in 1..num_streams {
rngs.push(ChaChaRng::seed_from_u64(rng.next_u64()));
}
rngs.push(rng);
RandomNumberGenerator { rngs, selected: 0 }
}
pub fn select(&mut self, index: usize) {
assert!(index < self.rngs.len());
self.selected = index;
}
pub fn get_selected(&self) -> usize {
self.selected
}
pub fn random_u64(&mut self) -> u64 {
self.rngs[self.selected].next_u64()
}
pub fn random_f64(&mut self) -> f64 {
self.rngs[self.selected].sample(Standard)
}
}
#[derive(Debug, Clone)]
struct QubitMeasurementData {
value: QubitMeasurementValue,
data: ArbData,
timestamp: Cycle,
timer: Option<Cycles>,
}
#[derive(Debug, Clone)]
struct QubitData {
measurement: Option<QubitMeasurementData>,
last_mutation: SequenceNumber,
}
pub struct PluginState<'a> {
definition: &'a PluginDefinition,
connection: Connection,
inside_run: bool,
synchronized_to_rpcs: bool,
frontend_to_host_data: VecDeque<ArbData>,
host_to_frontend_data: VecDeque<ArbData>,
rng: Option<RandomNumberGenerator>,
upstream_qubit_ref_generator: QubitRefGenerator,
upstream_issued_up_to: SequenceNumber,
upstream_postponed: VecDeque<(SequenceNumber, SequenceNumber, Vec<QubitMeasurementResult>)>,
upstream_completed_up_to: SequenceNumber,
downstream_sequence_tx: SequenceNumberGenerator,
downstream_sequence_rx: SequenceNumber,
downstream_cycle_tx: Cycle,
downstream_cycle_rx: Cycle,
downstream_qubit_ref_generator: QubitRefGenerator,
downstream_qubit_data: HashMap<QubitRef, QubitData>,
downstream_measurement_queue: VecDeque<QubitMeasurementResult>,
downstream_expected_measurements: VecDeque<(SequenceNumber, HashSet<QubitRef>)>,
aborted: bool,
}
impl<'a> PluginState<'a> {
fn handle_init(&mut self, req: PluginInitializeRequest) -> Result<PluginInitializeResponse> {
let typ = self.definition.get_type();
let seed = req.seed;
setup_logging(&req.log_configuration, req.log_channel)?;
trace!("started handle_init()!");
trace!("seeding with value {}", seed);
self.rng.replace(RandomNumberGenerator::new(3, seed));
if typ != req.plugin_type {
inv_op(format!(
"host is expecting a plugin of type {:?}, but we're a plugin of type {:?}",
req.plugin_type, typ
))?;
}
if typ != PluginType::Backend {
self.connection
.connect_downstream(req.downstream.unwrap())?;
}
let upstream = if typ == PluginType::Frontend {
None
} else {
Some(self.connection.serve_upstream()?)
};
trace!("finished handle_init()!");
Ok(PluginInitializeResponse {
upstream,
metadata: self.definition.get_metadata().clone(),
})
}
fn handle_accept_upstream(&mut self) -> Result<()> {
trace!("started accept_upstream()!");
let result = self.connection.accept_upstream();
trace!("finished accept_upstream()!");
result
}
fn handle_abort(&mut self) -> Result<()> {
trace!("started handle_abort()!");
self.synchronize_downstream()?;
(self.definition.drop)(self)?;
self.synchronize_downstream()?;
trace!("finished handle_abort()!");
Ok(())
}
fn handle_run(&mut self, req: FrontendRunRequest) -> Result<FrontendRunResponse> {
assert!(
!self.inside_run,
"handle_run() can only be used outside of the run() callback"
);
if self.definition.get_type() != PluginType::Frontend {
inv_op("received run request from simulator, but we're not a frontend!")?;
}
self.host_to_frontend_data.extend(req.messages);
let return_value = if let Some(args) = req.start {
self.inside_run = true;
let return_value = (self.definition.run)(self, args);
self.inside_run = false;
Some(return_value?)
} else {
None
};
let messages = self.frontend_to_host_data.drain(..).collect();
Ok(FrontendRunResponse {
return_value,
messages,
})
}
fn handle_measurement(&mut self, measurement: QubitMeasurementResult) -> Result<()> {
if let Some(data) = self.downstream_qubit_data.get_mut(&measurement.qubit) {
trace!("Caching measurement for qubit {}...", measurement.qubit);
let timestamp = self.downstream_cycle_rx;
let timer = if let Some(x) = &data.measurement {
let delta = timestamp - x.timestamp;
if delta < 0 {
panic!("simulation time is apparently not monotonous?");
}
Some(delta as Cycles)
} else {
None
};
data.measurement.replace(QubitMeasurementData {
value: measurement.value,
data: measurement.data.clone(),
timestamp,
timer,
});
if self.definition.get_type() == PluginType::Operator {
let measurements = (self.definition.modify_measurement)(self, measurement)?;
for measurement in measurements {
self.connection
.send(OutgoingMessage::Upstream(GatestreamUp::Measured(
measurement,
)))?;
}
}
} else {
trace!(
"Not caching measurement for qubit {}; no data exists (anymore)",
measurement.qubit
);
}
Ok(())
}
fn received_downstream_sequence(&mut self, sequence: SequenceNumber) -> Result<()> {
trace!("Downstream completed up to {}", sequence);
self.downstream_sequence_rx = sequence;
let measurements: Vec<_> = self.downstream_measurement_queue.drain(..).collect();
for measurement in measurements {
let mut pop = false;
let mut ok = false;
if let Some(expected) = self.downstream_expected_measurements.front_mut() {
if sequence.acknowledges(expected.0) && expected.1.remove(&measurement.qubit) {
ok = true;
pop = expected.1.is_empty();
}
}
if ok {
self.handle_measurement(measurement)?;
if pop {
self.downstream_expected_measurements.pop_front().unwrap();
}
} else {
warn!(
"ignored unexpected measurement data for qubit {}; bug in downstream plugin!",
measurement.qubit
);
}
}
loop {
let mut pop = false;
if let Some(expected) = self.downstream_expected_measurements.front_mut() {
if sequence.acknowledges(expected.0) {
pop = true;
}
}
if !pop {
break;
}
for qubit in self
.downstream_expected_measurements
.pop_front()
.unwrap()
.1
.drain()
{
if self.downstream_qubit_data.contains_key(&qubit) {
warn!(
"missing measurement data for qubit {}, setting to undefined; bug in downstream plugin!",
qubit
);
self.handle_measurement(QubitMeasurementResult::new(
qubit,
QubitMeasurementValue::Undefined,
ArbData::default(),
))?;
} else {
trace!(
"missing measurement data for qubit {}, which has already been deallocated",
qubit
);
}
}
}
self.check_completed_up_to()?;
Ok(())
}
fn check_completed_up_to(&mut self) -> Result<()> {
let mut completed_up_to = self.upstream_issued_up_to;
while !self.upstream_postponed.is_empty() {
let mut acknowledged = false;
if let Some((downstream, _, _)) = self.upstream_postponed.front() {
acknowledged = self.downstream_sequence_rx.acknowledges(*downstream);
}
if acknowledged {
let (_, _, postponed_measurements) = self.upstream_postponed.pop_front().unwrap();
for postponed_measurement in postponed_measurements {
self.connection
.send(OutgoingMessage::Upstream(GatestreamUp::Measured(
postponed_measurement,
)))?;
}
} else {
break;
}
}
if let Some((_, upstream, _)) = self.upstream_postponed.front() {
if completed_up_to.after(upstream.preceding()) {
completed_up_to = upstream.preceding();
}
}
if completed_up_to.after(self.upstream_completed_up_to) {
trace!("We've completed up to {}", completed_up_to);
self.connection
.send(OutgoingMessage::Upstream(GatestreamUp::CompletedUpTo(
completed_up_to,
)))?;
self.upstream_completed_up_to = completed_up_to;
}
Ok(())
}
fn handle_downstream_message(&mut self, message: GatestreamUp) -> Result<()> {
if let Some(ref mut rng) = self.rng {
rng.select(2);
}
self.synchronized_to_rpcs = false;
match message {
GatestreamUp::CompletedUpTo(sequence) => {
self.received_downstream_sequence(sequence)?;
}
GatestreamUp::Failure(sequence, message) => {
error!("Error from downstream plugin: {}", message);
debug!("The sequence number was {}", sequence);
fatal!("Desynchronized with downstream plugin due to downstream error, cannot continue!");
err(format!(
"simulation failed due to downstream error: {}",
message
))?;
}
GatestreamUp::Measured(measurement) => {
trace!(
"Downstream sent measurement for qubit {}",
measurement.qubit
);
self.downstream_measurement_queue.push_back(measurement);
}
GatestreamUp::Advanced(cycles) => {
self.downstream_cycle_rx = self.downstream_cycle_rx.advance(cycles);
}
x => {
error!("Unexpected message received from downstream");
trace!("{:?}", x);
err("unexpected message received from downstream")?;
}
}
Ok(())
}
fn handle_incoming_message(&mut self, request: IncomingMessage) -> Result<bool> {
if !self.aborted {
match request {
IncomingMessage::Simulator(message) => {
if let Some(ref mut rng) = self.rng {
rng.select(0)
}
self.synchronized_to_rpcs = true;
trace!("Received a request from the host");
let response = OutgoingMessage::Simulator(match message {
SimulatorToPlugin::Initialize(req) => match self.handle_init(*req) {
Ok(x) => PluginToSimulator::Initialized(x),
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
},
SimulatorToPlugin::AcceptUpstream => match self.handle_accept_upstream() {
Ok(_) => PluginToSimulator::Success,
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
},
SimulatorToPlugin::UserInitialize(req) => {
match (self.definition.initialize)(self, req.init_cmds) {
Ok(_) => PluginToSimulator::Success,
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
}
}
SimulatorToPlugin::Abort => {
self.aborted = true;
match self.handle_abort() {
Ok(_) => PluginToSimulator::Success,
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
}
}
SimulatorToPlugin::RunRequest(req) => match self.handle_run(req) {
Ok(x) => PluginToSimulator::RunResponse(x),
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
},
SimulatorToPlugin::ArbRequest(req) => {
match (self.definition.host_arb)(self, req) {
Ok(x) => PluginToSimulator::ArbResponse(x),
Err(e) => {
let e = e.to_string();
error!("{}", e);
PluginToSimulator::Failure(e)
}
}
}
});
self.synchronize_downstream()?;
trace!("Returning control to the host");
self.connection.send(response)?;
}
IncomingMessage::Upstream(GatestreamDown::Pipelined(sequence, message)) => {
if let Some(ref mut rng) = self.rng {
rng.select(1)
}
self.synchronized_to_rpcs = true;
trace!("Received request {} from upstream", sequence);
let mut queued_measurements = vec![];
let response = match message {
PipelinedGatestreamDown::Allocate(num_qubits, commands) => {
let qubits = self.upstream_qubit_ref_generator.allocate(num_qubits);
(self.definition.allocate)(self, qubits, commands)
}
PipelinedGatestreamDown::Free(qubits) => {
self.upstream_qubit_ref_generator.free(qubits.clone());
(self.definition.free)(self, qubits)
}
PipelinedGatestreamDown::Gate(gate) => {
let mut measures: HashSet<_> =
gate.get_measures().iter().cloned().collect();
(self.definition.gate)(self, gate).and_then(|measurements| {
for measurement in measurements {
if measures.remove(&measurement.qubit) {
queued_measurements.push(measurement);
} else {
err(format!(
"user-defined gate() function returned multiple measurements for qubit {}",
measurement.qubit
))?;
}
}
if !measures.is_empty() {
if self.definition.get_type() == PluginType::Operator {
trace!("Postponing measurement results for {} until downstream {}",
sequence, self.downstream_sequence_tx.get_previous());
} else {
err(format!(
"user-defined gate() function failed to return measurement for qubits {}",
friendly_enumerate(measures.into_iter(), Some("or"))
))?;
}
}
Ok(())
})
}
PipelinedGatestreamDown::Advance(cycles) => self
.connection
.send(OutgoingMessage::Upstream(GatestreamUp::Advanced(cycles)))
.and_then(|_| (self.definition.advance)(self, cycles)),
};
if let Err(e) = response {
let e = e.to_string();
error!("{}", e);
self.connection
.send(OutgoingMessage::Upstream(GatestreamUp::Failure(
sequence, e,
)))?;
}
self.upstream_issued_up_to = sequence;
trace!("We've just finished issuing {}", sequence);
if self.definition.get_type() == PluginType::Operator {
let back_sequence = self.downstream_sequence_tx.get_previous();
self.upstream_postponed.push_back((
back_sequence,
sequence,
queued_measurements,
));
trace!(
"Downstream needs to complete up to {} to ack {}",
back_sequence,
sequence
);
} else {
for measurement in queued_measurements {
self.connection.send(OutgoingMessage::Upstream(
GatestreamUp::Measured(measurement),
))?;
}
}
self.check_completed_up_to()?;
}
IncomingMessage::Upstream(GatestreamDown::ArbRequest(cmd)) => {
if let Some(ref mut rng) = self.rng {
rng.select(1)
}
self.synchronized_to_rpcs = true;
let response = match (self.definition.upstream_arb)(self, cmd) {
Ok(r) => GatestreamUp::ArbSuccess(r),
Err(e) => GatestreamUp::ArbFailure(e.to_string()),
};
self.connection.send(OutgoingMessage::Upstream(response))?;
}
IncomingMessage::Downstream(message) => self.handle_downstream_message(message)?,
}
}
Ok(self.aborted)
}
fn _synchronize_downstream_up_to(&mut self, num: SequenceNumber) -> Result<()> {
while num.after(self.downstream_sequence_rx) {
match self.connection.next_downstream_request()? {
Some(IncomingMessage::Downstream(message)) => {
self.handle_downstream_message(message)?
}
Some(_) => panic!("next_downstream_request() returned a non-downstream message"),
None => err("Simulation aborted")?,
}
}
Ok(())
}
fn synchronize_downstream_up_to(&mut self, num: SequenceNumber) -> Result<()> {
let rng_index = self
.rng
.as_ref()
.map(RandomNumberGenerator::get_selected)
.unwrap_or(0);
trace!("Syncing up to {}", num);
let result = self._synchronize_downstream_up_to(num);
trace!("Synced up to {}", num);
if let Some(ref mut rng) = self.rng {
rng.select(rng_index);
}
self.synchronized_to_rpcs = true;
result
}
fn synchronize_downstream(&mut self) -> Result<()> {
self.synchronize_downstream_up_to(self.downstream_sequence_tx.get_previous())
}
fn check_qubits_live<'b, 'c>(
&'b self,
qubits: impl IntoIterator<Item = &'c QubitRef>,
) -> Result<()> {
for qubit in qubits {
if !self.downstream_qubit_data.contains_key(qubit) {
inv_arg(format!("qubit {} is not allocated", qubit))?;
}
}
Ok(())
}
pub fn run(definition: &'a PluginDefinition, simulator: impl Into<String>) -> Result<()> {
let mut state = PluginState {
definition,
connection: Connection::new(simulator)?,
inside_run: false,
synchronized_to_rpcs: true,
frontend_to_host_data: VecDeque::new(),
host_to_frontend_data: VecDeque::new(),
rng: None,
downstream_qubit_ref_generator: QubitRefGenerator::new(),
downstream_sequence_tx: SequenceNumberGenerator::new(),
downstream_sequence_rx: SequenceNumber::none(),
downstream_cycle_tx: Cycle::t_zero(),
downstream_cycle_rx: Cycle::t_zero(),
upstream_qubit_ref_generator: QubitRefGenerator::new(),
upstream_issued_up_to: SequenceNumber::none(),
upstream_postponed: VecDeque::new(),
upstream_completed_up_to: SequenceNumber::none(),
downstream_qubit_data: HashMap::new(),
downstream_measurement_queue: VecDeque::new(),
downstream_expected_measurements: VecDeque::new(),
aborted: false,
};
while let Some(request) = state.connection.next_request()? {
if state.handle_incoming_message(request)? {
break;
}
}
Ok(())
}
pub fn send(&mut self, msg: ArbData) -> Result<()> {
if !self.inside_run {
inv_op("send() can only be called from inside the run() callback")?;
}
self.frontend_to_host_data.push_back(msg);
Ok(())
}
pub fn recv(&mut self) -> Result<ArbData> {
if !self.inside_run {
inv_op("recv() can only be called from inside the run() callback")?;
}
while self.host_to_frontend_data.is_empty() {
self.synchronize_downstream()?;
self.connection
.send(OutgoingMessage::Simulator(PluginToSimulator::RunResponse(
FrontendRunResponse {
return_value: None,
messages: self.frontend_to_host_data.drain(..).collect(),
},
)))
.unwrap();
while self.host_to_frontend_data.is_empty() {
let request = self
.connection
.next_request()?
.ok_or_else(oe_err("Simulation aborted"))?;
if let IncomingMessage::Simulator(SimulatorToPlugin::RunRequest(request)) = request
{
if let Some(ref mut rng) = self.rng {
rng.select(0)
}
self.synchronized_to_rpcs = true;
self.host_to_frontend_data.extend(request.messages);
if request.start.is_some() {
return err("Protocol error: cannot start accelerator while accelerator is already running");
}
break;
} else if self.handle_incoming_message(request)? {
return err("Simulation aborted");
}
}
}
Ok(self.host_to_frontend_data.pop_front().unwrap())
}
pub fn allocate(&mut self, num_qubits: usize, commands: Vec<ArbCmd>) -> Result<Vec<QubitRef>> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("allocate() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("allocate() cannot be called while handling a gatestream response")?;
}
let qubits = self.downstream_qubit_ref_generator.allocate(num_qubits);
for qubit in qubits.iter().cloned() {
self.downstream_qubit_data.insert(
qubit,
QubitData {
measurement: None,
last_mutation: SequenceNumber::none(),
},
);
}
self.connection
.send(OutgoingMessage::Downstream(GatestreamDown::Pipelined(
self.downstream_sequence_tx.get_next(),
PipelinedGatestreamDown::Allocate(num_qubits, commands),
)))?;
Ok(qubits)
}
pub fn free(&mut self, qubits: Vec<QubitRef>) -> Result<()> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("free() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("free() cannot be called while handling a gatestream response")?;
}
self.check_qubits_live(qubits.iter())?;
self.connection
.send(OutgoingMessage::Downstream(GatestreamDown::Pipelined(
self.downstream_sequence_tx.get_next(),
PipelinedGatestreamDown::Free(qubits.clone()),
)))?;
for qubit in qubits.iter() {
self.downstream_qubit_data.remove(qubit);
}
self.downstream_qubit_ref_generator.free(qubits);
Ok(())
}
pub fn gate(&mut self, gate: Gate) -> Result<()> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("gate() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("gate() cannot be called while handling a gatestream response")?;
}
self.check_qubits_live(gate.get_targets())?;
self.check_qubits_live(gate.get_controls())?;
self.check_qubits_live(gate.get_measures())?;
let measures: HashSet<_> = gate.get_measures().iter().cloned().collect();
self.connection
.send(OutgoingMessage::Downstream(GatestreamDown::Pipelined(
self.downstream_sequence_tx.get_next(),
PipelinedGatestreamDown::Gate(gate),
)))?;
let sequence = self.downstream_sequence_tx.get_previous();
for measure in measures.iter() {
self.downstream_qubit_data
.get_mut(measure)
.unwrap()
.last_mutation = sequence;
}
if !measures.is_empty() {
self.downstream_expected_measurements
.push_back((sequence, measures));
}
Ok(())
}
pub fn get_measurement(&mut self, qubit: QubitRef) -> Result<QubitMeasurementResult> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("get_measurement() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op(
"get_measurement() cannot be called while handling a gatestream response",
)?;
}
if let Some(last_mutation) = self
.downstream_qubit_data
.get(&qubit)
.map(|data| data.last_mutation)
{
self.synchronize_downstream_up_to(last_mutation)?;
} else {
inv_arg(format!("qubit {} is not allocated", qubit))?;
}
let data = &self.downstream_qubit_data[&qubit];
if let Some(measurement) = &data.measurement {
Ok(QubitMeasurementResult::new(
qubit,
measurement.value,
measurement.data.clone(),
))
} else {
inv_arg(format!("qubit {} has not been measured yet", qubit))
}
}
pub fn get_cycles_since_measure(&mut self, qubit: QubitRef) -> Result<u64> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("get_cycles_since_measure() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op(
"get_cycles_since_measure() cannot be called while handling a gatestream response",
)?;
}
if let Some(last_mutation) = self
.downstream_qubit_data
.get(&qubit)
.map(|data| data.last_mutation)
{
self.synchronize_downstream_up_to(last_mutation)?;
} else {
inv_arg(format!("qubit {} is not allocated", qubit))?;
}
let data = &self.downstream_qubit_data[&qubit];
if let Some(measurement) = &data.measurement {
let delta = self.downstream_cycle_tx - measurement.timestamp;
assert!(delta >= 0);
Ok(delta as u64)
} else {
inv_arg(format!("qubit {} has not been measured yet", qubit))
}
}
pub fn get_cycles_between_measures(&mut self, qubit: QubitRef) -> Result<u64> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("get_cycles_between_measures() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("get_cycles_between_measures() cannot be called while handling a gatestream response")?;
}
if let Some(last_mutation) = self
.downstream_qubit_data
.get(&qubit)
.map(|data| data.last_mutation)
{
self.synchronize_downstream_up_to(last_mutation)?;
} else {
inv_arg(format!("qubit {} is not allocated", qubit))?;
}
let data = &self.downstream_qubit_data[&qubit];
if let Some(measurement) = &data.measurement {
if let Some(timer) = measurement.timer {
Ok(timer)
} else {
inv_arg(format!("qubit {} has only been measured once", qubit))
}
} else {
inv_arg(format!("qubit {} has not been measured yet", qubit))
}
}
pub fn advance(&mut self, cycles: Cycles) -> Result<Cycle> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("advance() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("advance() cannot be called while handling a gatestream response")?;
}
self.downstream_cycle_tx = self.downstream_cycle_tx.advance(cycles);
self.connection
.send(OutgoingMessage::Downstream(GatestreamDown::Pipelined(
self.downstream_sequence_tx.get_next(),
PipelinedGatestreamDown::Advance(cycles),
)))?;
Ok(self.downstream_cycle_tx)
}
pub fn get_cycle(&self) -> Result<Cycle> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("get_cycle() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("get_cycle() cannot be called while handling a gatestream response")?;
}
Ok(self.downstream_cycle_tx)
}
pub fn arb(&mut self, cmd: ArbCmd) -> Result<ArbData> {
if self.definition.get_type() == PluginType::Backend {
return inv_op("arb() is not available for backends")?;
} else if !self.synchronized_to_rpcs {
return inv_op("arb() cannot be called while handling a gatestream response")?;
}
self.synchronize_downstream()?;
self.connection
.send(OutgoingMessage::Downstream(GatestreamDown::ArbRequest(cmd)))?;
match self.connection.next_downstream_request()? {
Some(IncomingMessage::Downstream(GatestreamUp::ArbSuccess(x))) => Ok(x),
Some(IncomingMessage::Downstream(GatestreamUp::ArbFailure(e))) => err(e),
Some(IncomingMessage::Downstream(_)) => {
err("Protocol error: unexpected message from downstream")
}
Some(_) => panic!("next_downstream_request() returned a non-downstream message"),
None => err("Simulation aborted"),
}
}
pub fn random_u64(&mut self) -> u64 {
self.rng.as_mut().expect("RNG not initialized").random_u64()
}
pub fn random_f64(&mut self) -> f64 {
self.rng.as_mut().expect("RNG not initialized").random_f64()
}
}