mod unit_generator;
mod unsupported_tracker;
mod value_tracker;
use hugr::core::HugrNode;
use hugr_core::hugr::internal::PortgraphNodeMap;
use tket_json_rs::clexpr::InputClRegister;
use tket_json_rs::opbox::BoxID;
pub use value_tracker::{
TrackedBit, TrackedParam, TrackedQubit, TrackedValue, TrackedValues, ValueTracker,
};
use hugr::ops::{OpTrait, OpType};
use hugr::types::EdgeKind;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use hugr::{Direction, HugrView, OutgoingPort, Wire};
use itertools::Itertools;
use tket_json_rs::circuit_json::{self, SerialCircuit};
use unsupported_tracker::UnsupportedTracker;
use super::opaque::OpaqueSubgraphs;
use super::{PytketEncodeError, PytketEncodeOpError};
use crate::metadata;
use crate::serialize::pytket::circuit::{
AdditionalNodesAndWires, AdditionalSubgraph, EncodedCircuitInfo,
};
use crate::serialize::pytket::config::PytketEncoderConfig;
use crate::serialize::pytket::extension::RegisterCount;
use crate::serialize::pytket::opaque::{OpaqueSubgraph, OpaqueSubgraphPayload};
#[derive(derive_more::Debug)]
#[debug(bounds(H: HugrView))]
pub struct PytketEncoderContext<H: HugrView> {
name: Option<String>,
phase: String,
commands: Vec<circuit_json::Command>,
pub values: ValueTracker<H::Node>,
unsupported: UnsupportedTracker<H::Node>,
opaque_subgraphs: OpaqueSubgraphs<H::Node>,
non_emitted_subgraphs: Vec<AdditionalSubgraph>,
config: Arc<PytketEncoderConfig<H>>,
function_cache: Arc<RwLock<HashMap<H::Node, CachedEncodedFunction>>>,
}
#[derive(Default)]
#[expect(clippy::type_complexity)]
pub struct EmitCommandOptions<'a> {
reuse_qubits_fn: Option<Box<dyn FnOnce(&[TrackedQubit]) -> Vec<TrackedQubit> + 'a>>,
reuse_bits_fn: Option<Box<dyn FnOnce(&[TrackedBit]) -> Vec<TrackedBit> + 'a>>,
output_params_fn: Option<Box<dyn FnOnce(OutputParamArgs<'_>) -> Vec<String> + 'a>>,
}
impl<'a> EmitCommandOptions<'a> {
pub fn new() -> Self {
Self {
reuse_qubits_fn: None,
reuse_bits_fn: None,
output_params_fn: None,
}
}
pub fn reuse_qubits(
mut self,
reuse_qubits: impl FnOnce(&[TrackedQubit]) -> Vec<TrackedQubit> + 'a,
) -> Self {
self.reuse_qubits_fn = Some(Box::new(reuse_qubits));
self
}
pub fn reuse_bits(
mut self,
reuse_bits: impl FnOnce(&[TrackedBit]) -> Vec<TrackedBit> + 'a,
) -> Self {
self.reuse_bits_fn = Some(Box::new(reuse_bits));
self
}
pub fn reuse_all_bits(self) -> Self {
self.reuse_bits(|inp_bits| inp_bits.to_owned())
}
pub fn output_params(
mut self,
output_params: impl FnOnce(OutputParamArgs<'_>) -> Vec<String> + 'a,
) -> Self {
self.output_params_fn = Some(Box::new(output_params));
self
}
}
impl<H: HugrView> PytketEncoderContext<H> {
pub(super) fn new(
hugr: &H,
region: H::Node,
opaque_subgraphs: OpaqueSubgraphs<H::Node>,
config: impl Into<Arc<PytketEncoderConfig<H>>>,
) -> Result<Self, PytketEncodeError<H::Node>> {
let config: Arc<PytketEncoderConfig<H>> = config.into();
let fn_name = match hugr.get_optype(region) {
OpType::FuncDefn(f) => Some(f.func_name()),
OpType::FuncDecl(f) => Some(f.func_name()),
_ => None,
}
.filter(|name| !name.is_empty())
.cloned();
let phase = match hugr.get_metadata::<metadata::Phase>(region) {
Some(p) => p.to_string(),
None => "0".to_string(),
};
Ok(Self {
name: fn_name,
phase,
commands: vec![],
values: ValueTracker::new(hugr, region, &config)?,
unsupported: UnsupportedTracker::new(hugr),
opaque_subgraphs,
non_emitted_subgraphs: vec![],
config,
function_cache: Arc::new(RwLock::new(HashMap::new())),
})
}
pub(super) fn run_encoder(
&mut self,
hugr: &H,
region: H::Node,
) -> Result<(), PytketEncodeError<H::Node>> {
if hugr.get_parent(region) == Some(hugr.module_root()) {
let Ok(mut cache) = self.function_cache.write() else {
return Err(PytketEncodeError::custom("Detected encoder worker panic."));
};
cache.insert(region, CachedEncodedFunction::InEncodingStack);
}
let (region, node_map) = hugr.region_portgraph(region);
let mut topo = petgraph::visit::Topo::new(®ion);
while let Some(pg_node) = topo.next(®ion) {
let node = node_map.from_portgraph(pg_node);
self.try_encode_node(node, hugr)?;
}
Ok(())
}
#[expect(clippy::type_complexity)]
pub(super) fn finish(
mut self,
hugr: &H,
region: H::Node,
) -> Result<(EncodedCircuitInfo, OpaqueSubgraphs<H::Node>), PytketEncodeError<H::Node>> {
while !self.unsupported.is_empty() {
let node = self.unsupported.iter().next().unwrap();
let opaque_subgraphs = self.unsupported.extract_component(node, hugr)?;
self.emit_unsupported(&opaque_subgraphs, hugr)?;
}
let tracker_result = self.values.finish(hugr, region)?;
let mut ser = SerialCircuit::new(self.name, self.phase);
ser.commands = self.commands;
ser.qubits = tracker_result.qubits.into_iter().map_into().collect();
ser.bits = tracker_result.bits.into_iter().map_into().collect();
ser.implicit_permutation = tracker_result.qubit_permutation;
ser.number_of_ws = None;
let info = EncodedCircuitInfo {
serial_circuit: ser,
input_params: tracker_result.input_params,
output_params: tracker_result.params,
additional_nodes_and_wires: AdditionalNodesAndWires {
additional_subgraphs: self.non_emitted_subgraphs,
straight_through_wires: tracker_result.straight_through_wires,
},
output_qubits: tracker_result.qubit_outputs,
output_bits: tracker_result.bit_outputs,
};
Ok((info, self.opaque_subgraphs))
}
pub fn config(&self) -> &PytketEncoderConfig<H> {
&self.config
}
pub fn get_wire_values(
&mut self,
wire: Wire<H::Node>,
hugr: &H,
) -> Result<Cow<'_, [TrackedValue]>, PytketEncodeError<H::Node>> {
if self.values.peek_wire_values(wire).is_some() {
return Ok(self.values.wire_values(wire).unwrap());
}
if self.unsupported.is_unsupported(wire.node()) {
let unsupported_nodes = self.unsupported.extract_component(wire.node(), hugr)?;
self.emit_unsupported(&unsupported_nodes, hugr)?;
debug_assert!(!self.unsupported.is_unsupported(wire.node()));
return self.get_wire_values(wire, hugr);
}
Err(PytketEncodeOpError::WireHasNoValues { wire }.into())
}
pub fn get_input_values(
&mut self,
node: H::Node,
hugr: &H,
) -> Result<TrackedValues, PytketEncodeError<H::Node>> {
self.get_input_values_internal(node, hugr, |_| true)?
.try_into_tracked_values()
}
fn get_input_values_internal(
&mut self,
node: H::Node,
hugr: &H,
wire_filter: impl Fn(Wire<H::Node>) -> bool,
) -> Result<NodeInputValues<H::Node>, PytketEncodeError<H::Node>> {
let mut tracked_values = TrackedValues::default();
let mut unknown_values = Vec::new();
let optype = hugr.get_optype(node);
let other_input_port = optype.other_input_port();
for input in hugr.node_inputs(node) {
if Some(input) == other_input_port {
continue;
}
let (neigh, neigh_out) = hugr
.single_linked_output(node, input)
.expect("Dataflow input port should have a single neighbour");
let wire = Wire::new(neigh, neigh_out);
if !wire_filter(wire) {
continue;
}
match self.get_wire_values(wire, hugr) {
Ok(values) => tracked_values.extend(values.iter().copied()),
Err(PytketEncodeError::OpEncoding(PytketEncodeOpError::WireHasNoValues {
wire,
})) => unknown_values.push(wire),
Err(e) => return Err(e),
}
}
Ok(NodeInputValues {
tracked_values,
unknown_values,
})
}
pub fn emit_node(
&mut self,
pytket_optype: tket_json_rs::OpType,
node: H::Node,
hugr: &H,
options: EmitCommandOptions,
) -> Result<(), PytketEncodeError<H::Node>> {
self.emit_node_command(node, hugr, options, move |inputs| {
make_tk1_operation(pytket_optype, inputs)
})
}
pub fn emit_node_command(
&mut self,
node: H::Node,
hugr: &H,
options: EmitCommandOptions,
make_operation: impl FnOnce(MakeOperationArgs<'_>) -> circuit_json::Operation,
) -> Result<(), PytketEncodeError<H::Node>> {
let TrackedValues {
mut qubits,
mut bits,
params,
} = self.get_input_values(node, hugr)?;
let params: Vec<String> = params
.into_iter()
.map(|p| self.values.param_expression(p).to_owned())
.collect();
let new_outputs =
self.register_node_outputs(node, hugr, &qubits, &bits, ¶ms, options, |_| true)?;
qubits.extend(new_outputs.qubits);
bits.extend(new_outputs.bits);
let opgroup: Option<String> = hugr
.get_metadata::<metadata::OpGroup>(node)
.map(ToString::to_string);
let args = MakeOperationArgs {
num_qubits: qubits.len(),
num_bits: bits.len(),
params: Cow::Borrowed(¶ms),
};
let op = make_operation(args);
self.emit_command(op, &qubits, &bits, opgroup);
Ok(())
}
pub fn emit_transparent_node(
&mut self,
node: H::Node,
hugr: &H,
output_params: impl FnOnce(OutputParamArgs<'_>) -> Vec<String>,
) -> Result<(), PytketEncodeError<H::Node>> {
let input_values = self.get_input_values(node, hugr)?;
let output_counts = self.node_output_values(node, hugr)?;
let total_out_count: RegisterCount = output_counts.iter().map(|(_, c)| *c).sum();
let input_params: Vec<String> = input_values
.params
.into_iter()
.map(|p| self.values.param_expression(p).to_owned())
.collect_vec();
let out_params = output_params(OutputParamArgs {
expected_count: total_out_count.params,
input_params: &input_params,
});
if input_values.qubits.len() != total_out_count.qubits {
return Err(PytketEncodeError::custom(format!(
"Mismatched number of input and output qubits while trying to emit a transparent operation for {}. We have {} inputs but {} outputs.",
hugr.get_optype(node),
input_values.qubits.len(),
total_out_count.qubits,
)));
}
if input_values.bits.len() != total_out_count.bits {
return Err(PytketEncodeError::custom(format!(
"Mismatched number of input and output bits while trying to emit a transparent operation for {}. We have {} inputs but {} outputs.",
hugr.get_optype(node),
input_values.bits.len(),
total_out_count.bits,
)));
}
if out_params.len() != total_out_count.params {
return Err(PytketEncodeError::custom(format!(
"Expected {} parameters in the input values for a {}, but got {}.",
total_out_count.params,
hugr.get_optype(node),
out_params.len()
)));
}
let mut qubits = input_values.qubits.into_iter();
let mut bits = input_values.bits.into_iter();
let mut params = out_params.into_iter();
for (wire, count) in output_counts {
let mut values: Vec<TrackedValue> = Vec::with_capacity(count.total());
values.extend(qubits.by_ref().take(count.qubits).map(TrackedValue::Qubit));
values.extend(bits.by_ref().take(count.bits).map(TrackedValue::Bit));
for p in params.by_ref().take(count.params) {
values.push(self.values.new_param(p).into());
}
self.values.register_wire(wire, values, hugr)?;
}
Ok(())
}
fn emit_unsupported(
&mut self,
subgraph: &OpaqueSubgraph<H::Node>,
hugr: &H,
) -> Result<(), PytketEncodeError<H::Node>> {
let subgraph_id = self
.opaque_subgraphs
.register_opaque_subgraph(subgraph.clone());
let mut op_values = TrackedValues::default();
for (node, port) in subgraph.incoming_ports().iter() {
let (neigh, neigh_out) = hugr
.single_linked_output(*node, *port)
.expect("Dataflow input port should have a single neighbour");
let wire = Wire::new(neigh, neigh_out);
let Ok(tracked_values) = self.get_wire_values(wire, hugr) else {
continue;
};
op_values.extend(tracked_values.iter().cloned());
}
let input_param_exprs: Vec<String> = std::mem::take(&mut op_values.params)
.into_iter()
.map(|p| self.values.param_expression(p).to_owned())
.collect();
let payload = OpaqueSubgraphPayload::new_external(subgraph_id, input_param_exprs.clone());
let mut out_param_count = 0;
let input_qubits = op_values.qubits.clone();
let input_bits = op_values.bits.clone();
let mut out_qubits = input_qubits.as_slice();
let mut out_bits = input_bits.as_slice();
for ((out_node, out_port), ty) in subgraph
.outgoing_ports()
.iter()
.zip(subgraph.signature().output().iter())
{
if self.config().type_to_pytket(ty).is_none() {
continue;
}
let new_outputs = self.register_port_output(
*out_node,
*out_port,
hugr,
&mut out_qubits,
&mut out_bits,
&input_param_exprs,
|p| {
let range = out_param_count..out_param_count + p.expected_count;
out_param_count += p.expected_count;
range.map(|i| subgraph_id.output_parameter(i)).collect_vec()
},
)?;
op_values.append(new_outputs);
}
if op_values.qubits.is_empty() && op_values.bits.is_empty() {
self.non_emitted_subgraphs.push(AdditionalSubgraph {
id: subgraph_id,
params: input_param_exprs.clone(),
});
} else {
let args = MakeOperationArgs {
num_qubits: op_values.qubits.len(),
num_bits: op_values.bits.len(),
params: Cow::Borrowed(&[]),
};
let mut pytket_op = make_tk1_operation(tket_json_rs::OpType::Barrier, args);
pytket_op.data = Some(serde_json::to_string(&payload).unwrap());
self.emit_command(pytket_op, &op_values.qubits, &op_values.bits, None);
}
Ok(())
}
pub fn emit_command(
&mut self,
pytket_op: circuit_json::Operation,
qubits: &[TrackedQubit],
bits: &[TrackedBit],
opgroup: Option<String>,
) {
let qubit_regs = qubits.iter().map(|&qb| self.values.qubit_register(qb));
let bit_regs = bits.iter().map(|&b| self.values.bit_register(b));
let command = circuit_json::Command {
op: pytket_op,
args: qubit_regs.chain(bit_regs).cloned().collect(),
opgroup,
};
self.commands.push(command);
}
#[expect(unused)]
fn emit_subcircuit(
&mut self,
node: H::Node,
hugr: &H,
) -> Result<EncodeStatus, PytketEncodeError<H::Node>> {
let config = Arc::clone(&self.config);
let opaque_subgraphs = std::mem::take(&mut self.opaque_subgraphs);
let mut subencoder = PytketEncoderContext::new(hugr, node, opaque_subgraphs, config)?;
subencoder.function_cache = self.function_cache.clone();
subencoder.run_encoder(hugr, node)?;
let (info, opaque_subgraphs) = subencoder.finish(hugr, node)?;
if !info.output_params.is_empty() {
return Ok(EncodeStatus::Unsupported);
}
self.opaque_subgraphs = opaque_subgraphs;
self.emit_circ_box(node, info.serial_circuit, hugr)?;
Ok(EncodeStatus::Success)
}
#[expect(unused)]
fn emit_function_call(
&mut self,
node: H::Node,
function: H::Node,
hugr: &H,
) -> Result<EncodeStatus, PytketEncodeError<H::Node>> {
let cache = self.function_cache.read().ok();
if let Some(encoded) = cache.as_ref().and_then(|c| c.get(&function)) {
let encoded = encoded.clone();
drop(cache);
match encoded {
CachedEncodedFunction::Encoded { serial_circuit } => {
self.emit_circ_box(node, serial_circuit, hugr)?;
return Ok(EncodeStatus::Success);
}
CachedEncodedFunction::Unsupported | CachedEncodedFunction::InEncodingStack => {
return Ok(EncodeStatus::Unsupported);
}
};
}
drop(cache);
let config = Arc::clone(&self.config);
let opaque_subgraphs = std::mem::take(&mut self.opaque_subgraphs);
let mut subencoder = PytketEncoderContext::new(hugr, function, opaque_subgraphs, config)?;
subencoder.function_cache = self.function_cache.clone();
subencoder.run_encoder(hugr, function)?;
let (info, opaque_subgraphs) = subencoder.finish(hugr, function)?;
self.opaque_subgraphs = opaque_subgraphs;
let (result, cached_fn) = match info.output_params.is_empty() {
true => (
EncodeStatus::Success,
CachedEncodedFunction::Encoded {
serial_circuit: info.serial_circuit.clone(),
},
),
false => (
EncodeStatus::Unsupported,
CachedEncodedFunction::Unsupported,
),
};
if let Ok(mut cache) = self.function_cache.write() {
cache.insert(function, cached_fn);
}
if result == EncodeStatus::Success {
self.emit_circ_box(node, info.serial_circuit, hugr)?;
}
Ok(result)
}
fn emit_circ_box(
&mut self,
node: H::Node,
boxed_circuit: SerialCircuit,
hugr: &H,
) -> Result<(), PytketEncodeError<H::Node>> {
self.emit_node_command(
node,
hugr,
EmitCommandOptions::new().reuse_all_bits(),
|args| {
let mut pytket_op = make_tk1_operation(tket_json_rs::OpType::CircBox, args);
pytket_op.op_box = Some(tket_json_rs::opbox::OpBox::CircBox {
id: BoxID::new(),
circuit: boxed_circuit,
});
pytket_op
},
)?;
Ok(())
}
fn try_encode_node(
&mut self,
node: H::Node,
hugr: &H,
) -> Result<EncodeStatus, PytketEncodeError<H::Node>> {
let optype = hugr.get_optype(node);
if self.encode_nonlocal_inputs(node, optype, hugr)? == EncodeStatus::Unsupported {
self.unsupported.record_node(node, hugr);
return Ok(EncodeStatus::Unsupported);
}
match optype {
OpType::ExtensionOp(op) => {
if !self.has_order_edges(node, optype, hugr) {
let config = Arc::clone(&self.config);
if config.op_to_pytket(node, op, hugr, self)? == EncodeStatus::Success {
return Ok(EncodeStatus::Success);
}
}
}
OpType::LoadConstant(constant) => {
if self
.config()
.type_to_pytket(constant.constant_type())
.is_some()
{
self.emit_transparent_node(node, hugr, |ps| ps.input_params.to_owned())?;
return Ok(EncodeStatus::Success);
}
}
OpType::Const(op) => {
let config = Arc::clone(&self.config);
if self.config().type_to_pytket(&op.get_type()).is_some()
&& let Some(values) = config.const_to_pytket(&op.value, self)?
{
let wire = Wire::new(node, 0);
self.values.register_wire(wire, values.into_iter(), hugr)?;
return Ok(EncodeStatus::Success);
}
}
OpType::Input(_) | OpType::Output(_) => {
return Ok(EncodeStatus::Success);
}
_ => {}
}
self.unsupported.record_node(node, hugr);
Ok(EncodeStatus::Unsupported)
}
fn encode_nonlocal_inputs(
&mut self,
node: H::Node,
optype: &OpType,
hugr: &H,
) -> Result<EncodeStatus, PytketEncodeError<H::Node>> {
let node_parent = hugr.get_parent(node);
let input_ports = hugr
.node_inputs(node)
.take(optype.value_input_count() + optype.static_input_port().is_some() as usize);
for (neigh, neigh_port) in input_ports.flat_map(|inp| hugr.linked_outputs(node, inp)) {
let wire = Wire::new(neigh, neigh_port);
if self.values.peek_wire_values(wire).is_some() {
continue;
}
let neigh_parent = hugr.get_parent(neigh);
if neigh_parent == node_parent {
continue;
}
if neigh_parent != Some(hugr.module_root()) {
return Ok(EncodeStatus::Unsupported);
}
let optype = hugr.get_optype(neigh);
match optype {
OpType::FuncDefn(_) | OpType::FuncDecl(_) => {
self.values
.register_wire::<TrackedValue>(wire, vec![], hugr)?;
}
OpType::Const(_) => {
if self.try_encode_node(neigh, hugr)? == EncodeStatus::Unsupported {
return Ok(EncodeStatus::Unsupported);
}
}
_ => {
return Ok(EncodeStatus::Unsupported);
}
}
}
Ok(EncodeStatus::Success)
}
fn has_order_edges(&mut self, node: H::Node, optype: &OpType, hugr: &H) -> bool {
optype
.other_port(Direction::Incoming)
.iter()
.chain(optype.other_port(Direction::Outgoing).iter())
.any(|&p| hugr.is_linked(node, p))
}
#[expect(clippy::too_many_arguments)]
fn register_node_outputs(
&mut self,
node: H::Node,
hugr: &H,
input_qubits: &[TrackedQubit],
input_bits: &[TrackedBit],
input_params: &[String],
options: EmitCommandOptions,
wire_filter: impl Fn(Wire<H::Node>) -> bool,
) -> Result<TrackedValues, PytketEncodeError<H::Node>> {
let output_counts = self.node_output_values(node, hugr)?;
let total_out_count: RegisterCount = output_counts.iter().map(|(_, c)| *c).sum();
let output_qubits = match options.reuse_qubits_fn {
Some(f) => f(input_qubits),
None => input_qubits.to_vec(),
};
let output_bits = match options.reuse_bits_fn {
Some(f) => f(input_bits),
None => input_bits.to_vec(),
};
let used_qubits: HashSet<TrackedQubit> = output_qubits.iter().copied().collect();
for qb in input_qubits {
if !used_qubits.contains(qb) {
self.values.free_qubit(*qb);
}
}
let out_params = match options.output_params_fn {
Some(f) => f(OutputParamArgs {
expected_count: total_out_count.params,
input_params,
}),
None => Vec::new(),
};
if out_params.len() != total_out_count.params {
return Err(PytketEncodeError::custom(format!(
"Expected {} parameters in the input values for a {}, but got {}.",
total_out_count.params,
hugr.get_optype(node),
out_params.len()
)));
}
let mut qubits = output_qubits.iter().copied();
let mut bits = output_bits.iter().copied();
let mut params = out_params.into_iter();
let mut new_outputs = TrackedValues::default();
for (wire, count) in output_counts {
if !wire_filter(wire) {
continue;
}
let mut out_wire_values = Vec::with_capacity(count.total());
out_wire_values.extend(qubits.by_ref().take(count.qubits).map(TrackedValue::Qubit));
for _ in out_wire_values.len()..count.qubits {
let qb = self.values.new_qubit();
new_outputs.qubits.push(qb);
out_wire_values.push(TrackedValue::Qubit(qb));
}
let non_bit_count = out_wire_values.len();
out_wire_values.extend(bits.by_ref().take(count.bits).map(TrackedValue::Bit));
let reused_bit_count = out_wire_values.len() - non_bit_count;
for _ in reused_bit_count..count.bits {
let b = self.values.new_bit();
new_outputs.bits.push(b);
out_wire_values.push(TrackedValue::Bit(b));
}
for expr in params.by_ref().take(count.params) {
let p = self.values.new_param(expr);
new_outputs.params.push(p);
out_wire_values.push(p.into());
}
self.values.register_wire(wire, out_wire_values, hugr)?;
}
Ok(new_outputs)
}
#[expect(clippy::too_many_arguments)]
fn register_port_output(
&mut self,
node: H::Node,
port: OutgoingPort,
hugr: &H,
qubits: &mut &[TrackedQubit],
bits: &mut &[TrackedBit],
input_params: &[String],
output_params_fn: impl FnOnce(OutputParamArgs<'_>) -> Vec<String>,
) -> Result<TrackedValues, PytketEncodeError<H::Node>> {
let wire = Wire::new(node, port);
let Some(ty) = hugr
.signature(node)
.and_then(|s| s.out_port_type(port).cloned())
else {
return Ok(TrackedValues::default());
};
let Some(count) = self.config().type_to_pytket(&ty) else {
return Err(PytketEncodeError::custom(format!(
"Found an unsupported type {ty} while encoding {port} of {node}."
)));
};
let out_params = output_params_fn(OutputParamArgs {
expected_count: count.params,
input_params,
});
if out_params.len() != count.params {
return Err(PytketEncodeError::custom(format!(
"Expected {} parameters in the input values for a {} at {port} of {node}, but got {}.",
count.params,
hugr.get_optype(node),
out_params.len()
)));
}
let mut new_outputs = TrackedValues::default();
let mut out_wire_values = Vec::with_capacity(count.total());
let output_qubits = match qubits.split_off(..count.qubits) {
Some(reused_qubits) => reused_qubits.to_vec(),
None => {
let mut head_qubits = qubits.to_vec();
*qubits = &[];
let new_qubits = (head_qubits.len()..count.qubits).map(|_| {
let q = self.values.new_qubit();
new_outputs.qubits.push(q);
q
});
head_qubits.extend(new_qubits);
head_qubits
}
};
out_wire_values.extend(output_qubits.iter().map(|&q| TrackedValue::Qubit(q)));
let output_bits = match bits.split_off(..count.bits) {
Some(reused_bits) => reused_bits.to_vec(),
None => {
let mut head_bits = bits.to_vec();
*bits = &[];
let new_bits = (head_bits.len()..count.bits).map(|_| {
let b = self.values.new_bit();
new_outputs.bits.push(b);
b
});
head_bits.extend(new_bits);
head_bits
}
};
out_wire_values.extend(output_bits.iter().map(|&b| TrackedValue::Bit(b)));
for expr in out_params.into_iter().take(count.params) {
let p = self.values.new_param(expr);
new_outputs.params.push(p);
out_wire_values.push(p.into());
}
self.values.register_wire(wire, out_wire_values, hugr)?;
Ok(new_outputs)
}
#[expect(clippy::type_complexity)]
fn node_output_values(
&self,
node: H::Node,
hugr: &H,
) -> Result<Vec<(Wire<H::Node>, RegisterCount)>, PytketEncodeError<H::Node>> {
let op = hugr.get_optype(node);
let signature = op.dataflow_signature();
let static_output = op.static_output_port();
let other_output = op.other_output_port();
let mut wire_counts = Vec::with_capacity(hugr.num_outputs(node));
for out_port in hugr.node_outputs(node) {
let ty = if Some(out_port) == other_output {
continue;
} else if Some(out_port) == static_output {
let EdgeKind::Const(ty) = op.static_output().unwrap() else {
return Err(PytketEncodeError::custom(format!(
"Cannot emit a static output for a {op}."
)));
};
ty
} else {
let Some(ty) = signature
.as_ref()
.and_then(|s| s.out_port_type(out_port).cloned())
else {
return Err(PytketEncodeError::custom(
"Cannot emit a transparent node without a dataflow signature.",
));
};
ty
};
let wire = hugr::Wire::new(node, out_port);
let Some(count) = self.config().type_to_pytket(&ty) else {
return Err(PytketEncodeError::custom(format!(
"Found an unsupported type {ty} while encoding a {op}."
)));
};
wire_counts.push((wire, count));
}
Ok(wire_counts)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, derive_more::Display)]
pub enum EncodeStatus {
Success,
Unsupported,
}
#[derive(Clone, Copy, Debug)]
pub struct OutputParamArgs<'a> {
pub expected_count: usize,
pub input_params: &'a [String],
}
#[derive(Clone, Debug)]
pub struct MakeOperationArgs<'a> {
pub num_qubits: usize,
pub num_bits: usize,
pub params: Cow<'a, [String]>,
}
struct NodeInputValues<N> {
pub tracked_values: TrackedValues,
pub unknown_values: Vec<Wire<N>>,
}
impl<N: HugrNode> NodeInputValues<N> {
pub fn try_into_tracked_values(self) -> Result<TrackedValues, PytketEncodeError<N>> {
match self.unknown_values.is_empty() {
true => Ok(self.tracked_values),
false => Err(PytketEncodeOpError::WireHasNoValues {
wire: self.unknown_values[0],
}
.into()),
}
}
}
#[derive(Clone, Debug)]
enum CachedEncodedFunction {
Encoded {
serial_circuit: SerialCircuit,
},
Unsupported,
InEncodingStack,
}
pub fn make_tk1_operation(
pytket_optype: tket_json_rs::OpType,
inputs: MakeOperationArgs<'_>,
) -> circuit_json::Operation {
let mut op = circuit_json::Operation::default();
op.op_type = pytket_optype;
op.n_qb = Some(inputs.num_qubits as u32);
op.params = match inputs.params.is_empty() {
false => Some(inputs.params.into_owned()),
true => None,
};
op.signature = Some(
[
vec!["Q".into(); inputs.num_qubits],
vec!["B".into(); inputs.num_bits],
]
.concat(),
);
op
}
pub fn make_tk1_classical_operation(
pytket_optype: tket_json_rs::OpType,
bit_count: usize,
classical: tket_json_rs::circuit_json::Classical,
) -> tket_json_rs::circuit_json::Operation {
let args = MakeOperationArgs {
num_qubits: 0,
num_bits: bit_count,
params: Cow::Borrowed(&[]),
};
let mut op = make_tk1_operation(pytket_optype, args);
op.classical = Some(Box::new(classical));
op
}
pub fn make_tk1_classical_expression(
bit_count: usize,
output_bits: &[u32],
registers: &[InputClRegister],
expression: tket_json_rs::clexpr::operator::ClOperator,
) -> tket_json_rs::circuit_json::Operation {
let mut clexpr = tket_json_rs::clexpr::ClExpr::default();
clexpr.bit_posn = (0..bit_count as u32).map(|i| (i, i)).collect();
clexpr.reg_posn = registers.to_vec();
clexpr.output_posn = tket_json_rs::clexpr::ClRegisterBits(output_bits.to_vec());
clexpr.expr = expression;
let args = MakeOperationArgs {
num_qubits: 0,
num_bits: bit_count,
params: Cow::Borrowed(&[]),
};
let mut op = make_tk1_operation(tket_json_rs::OpType::ClExpr, args);
op.classical_expr = Some(clexpr);
op
}