#![allow(clippy::missing_const_for_fn)]
use crate::{
cartan::{CartanDecomposer, CartanDecomposition},
gate::{
multi::{CNOT, CZ, SWAP},
single::{Hadamard, PauliX, PauliY, PauliZ, RotationX, RotationY, RotationZ},
GateOp,
},
jupyter_visualization::{
PyQuantumCircuitVisualizer, PyQuantumPerformanceMonitor, PyQuantumStateVisualizer,
},
quantum_complexity_analysis::{
analyze_algorithm_complexity, calculate_theoretical_quantum_volume,
compare_quantum_classical_complexity, PyQuantumComplexityAnalyzer,
},
quantum_internet::QuantumInternet,
quantum_sensor_networks::QuantumSensorNetwork,
qubit::QubitId,
realtime_monitoring::{
AggregatedStats, Alert, MetricMeasurement, MetricType, MetricValue, MonitoringConfig,
MonitoringStatus, OptimizationRecommendation, RealtimeMonitor,
},
synthesis::{decompose_single_qubit_zyz, SingleQubitDecomposition},
variational::VariationalCircuit,
};
use pyo3::prelude::*;
use pyo3::types::PyString;
type PyObject = Py<PyAny>;
use scirs2_core::ndarray::Array2;
use scirs2_core::Complex64;
use scirs2_numpy::{PyArray2, PyReadonlyArray2};
use std::time::{Duration, SystemTime};
#[pyclass(name = "QubitId")]
#[derive(Clone)]
pub struct PyQubitId {
pub inner: QubitId,
}
#[pymethods]
impl PyQubitId {
#[new]
fn new(id: u32) -> Self {
Self {
inner: QubitId::new(id),
}
}
fn __repr__(&self) -> String {
format!("QubitId({})", self.inner.0)
}
fn __str__(&self) -> String {
self.__repr__()
}
#[getter]
fn id(&self) -> u32 {
self.inner.0
}
}
#[pyclass(name = "QuantumGate")]
pub struct PyQuantumGate {
gate_type: String,
target_qubits: Vec<PyQubitId>,
parameters: Vec<f64>,
}
#[pymethods]
impl PyQuantumGate {
#[new]
fn new(gate_type: String, target_qubits: Vec<PyQubitId>, parameters: Option<Vec<f64>>) -> Self {
Self {
gate_type,
target_qubits,
parameters: parameters.unwrap_or_default(),
}
}
fn __repr__(&self) -> String {
format!(
"QuantumGate(type='{}', qubits={:?}, params={:?})",
self.gate_type,
self.target_qubits
.iter()
.map(|q| q.inner.0)
.collect::<Vec<_>>(),
self.parameters
)
}
#[getter]
fn gate_type(&self) -> &str {
&self.gate_type
}
#[getter]
fn target_qubits(&self) -> Vec<PyQubitId> {
self.target_qubits.clone()
}
#[getter]
fn parameters(&self) -> Vec<f64> {
self.parameters.clone()
}
fn matrix(&self, py: Python) -> PyResult<Py<PyArray2<Complex64>>> {
let matrix_result = match self.gate_type.as_str() {
"H" => Hadamard {
target: self.target_qubits[0].inner,
}
.matrix(),
"X" => PauliX {
target: self.target_qubits[0].inner,
}
.matrix(),
"Y" => PauliY {
target: self.target_qubits[0].inner,
}
.matrix(),
"Z" => PauliZ {
target: self.target_qubits[0].inner,
}
.matrix(),
"RX" => RotationX {
target: self.target_qubits[0].inner,
theta: self.parameters[0],
}
.matrix(),
"RY" => RotationY {
target: self.target_qubits[0].inner,
theta: self.parameters[0],
}
.matrix(),
"RZ" => RotationZ {
target: self.target_qubits[0].inner,
theta: self.parameters[0],
}
.matrix(),
"CNOT" => CNOT {
control: self.target_qubits[0].inner,
target: self.target_qubits[1].inner,
}
.matrix(),
"CZ" => CZ {
control: self.target_qubits[0].inner,
target: self.target_qubits[1].inner,
}
.matrix(),
"SWAP" => SWAP {
qubit1: self.target_qubits[0].inner,
qubit2: self.target_qubits[1].inner,
}
.matrix(),
_ => {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Unknown gate type: {}",
self.gate_type
)))
}
};
let matrix = matrix_result
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
let dim = match self.gate_type.as_str() {
"H" | "X" | "Y" | "Z" | "RX" | "RY" | "RZ" => 2,
"CNOT" | "CZ" | "SWAP" => 4,
_ => {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Unknown gate dimension",
))
}
};
let array = Array2::from_shape_vec((dim, dim), matrix).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("Array reshape error: {e}"))
})?;
let np_array = PyArray2::from_array(py, &array);
Ok(np_array.unbind())
}
}
#[pyclass(name = "SingleQubitDecomposition")]
pub struct PySingleQubitDecomposition {
pub inner: SingleQubitDecomposition,
}
#[pymethods]
impl PySingleQubitDecomposition {
#[getter]
fn theta1(&self) -> f64 {
self.inner.theta1
}
#[getter]
fn phi(&self) -> f64 {
self.inner.phi
}
#[getter]
fn theta2(&self) -> f64 {
self.inner.theta2
}
#[getter]
fn global_phase(&self) -> f64 {
self.inner.global_phase
}
fn __repr__(&self) -> String {
format!(
"SingleQubitDecomposition(theta1={:.6}, phi={:.6}, theta2={:.6}, global_phase={:.6})",
self.inner.theta1, self.inner.phi, self.inner.theta2, self.inner.global_phase
)
}
}
#[pyclass(name = "CartanDecomposition")]
pub struct PyCartanDecomposition {
pub inner: CartanDecomposition,
}
#[pymethods]
impl PyCartanDecomposition {
#[getter]
fn xx_coefficient(&self) -> f64 {
self.inner.interaction.xx
}
#[getter]
fn yy_coefficient(&self) -> f64 {
self.inner.interaction.yy
}
#[getter]
fn zz_coefficient(&self) -> f64 {
self.inner.interaction.zz
}
#[getter]
fn cnot_count(&self) -> usize {
self.inner.interaction.cnot_count(1e-10)
}
fn __repr__(&self) -> String {
format!(
"CartanDecomposition(xx={:.6}, yy={:.6}, zz={:.6}, cnots={})",
self.inner.interaction.xx,
self.inner.interaction.yy,
self.inner.interaction.zz,
self.cnot_count()
)
}
}
#[pyclass(name = "VariationalCircuit")]
pub struct PyVariationalCircuit {
pub inner: VariationalCircuit,
}
#[pymethods]
impl PyVariationalCircuit {
#[new]
fn new(num_qubits: usize) -> Self {
Self {
inner: VariationalCircuit::new(num_qubits),
}
}
#[getter]
fn num_qubits(&self) -> usize {
self.inner.num_qubits
}
#[getter]
fn num_parameters(&self) -> usize {
self.inner.get_parameters().len()
}
fn add_rotation_layer(&mut self, _axis: String) -> PyResult<()> {
Ok(())
}
fn add_entangling_layer(&mut self) {
}
fn __repr__(&self) -> String {
format!(
"VariationalCircuit(qubits={}, parameters={})",
self.num_qubits(),
self.num_parameters()
)
}
}
#[pyclass(name = "QuantumSensorNetwork")]
pub struct PyQuantumSensorNetwork {
pub inner: QuantumSensorNetwork,
}
#[pymethods]
impl PyQuantumSensorNetwork {
#[new]
fn new(network_id: u64) -> Self {
Self {
inner: QuantumSensorNetwork::new_for_python(network_id),
}
}
#[getter]
fn network_id(&self) -> u64 {
self.inner.network_id
}
fn num_sensors(&self) -> usize {
self.inner.quantum_sensors.len()
}
fn add_sensor(
&mut self,
_sensor_type: String,
_latitude: f64,
_longitude: f64,
) -> PyResult<u64> {
let sensor_id = self.inner.quantum_sensors.len() as u64;
Ok(sensor_id)
}
fn get_sensor_advantage(&self) -> f64 {
self.inner.calculate_quantum_advantage()
}
fn __repr__(&self) -> String {
format!(
"QuantumSensorNetwork(id={}, sensors={})",
self.network_id(),
self.num_sensors()
)
}
}
#[pyclass(name = "QuantumInternet")]
pub struct PyQuantumInternet {
pub inner: QuantumInternet,
}
#[pyclass(name = "NumRS2Array")]
#[derive(Clone)]
pub struct PyNumRS2Array {
pub inner: Array2<Complex64>, }
#[pymethods]
impl PyQuantumInternet {
#[new]
fn new() -> Self {
Self {
inner: QuantumInternet::new_for_python(),
}
}
fn get_coverage_percentage(&self) -> f64 {
self.inner.get_global_coverage_percentage()
}
fn get_node_count(&self) -> usize {
self.inner
.quantum_network_infrastructure
.quantum_nodes
.len()
}
fn add_quantum_node(
&mut self,
_latitude: f64,
_longitude: f64,
_node_type: String,
) -> PyResult<u64> {
let node_id = self
.inner
.quantum_network_infrastructure
.quantum_nodes
.len() as u64;
Ok(node_id)
}
fn __repr__(&self) -> String {
format!(
"QuantumInternet(coverage={:.1}%, nodes={})",
self.get_coverage_percentage(),
self.get_node_count()
)
}
}
#[pymethods]
impl PyNumRS2Array {
#[new]
fn new(shape: Vec<usize>) -> PyResult<Self> {
if shape.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Currently only 2D arrays are supported",
));
}
let array = Array2::<Complex64>::zeros((shape[0], shape[1]));
Ok(Self { inner: array })
}
#[staticmethod]
fn from_numpy(array: &Bound<'_, PyArray2<Complex64>>) -> PyResult<Self> {
use scirs2_numpy::PyArrayMethods;
let readonly = array.readonly();
let ndarray_view = readonly.as_array();
let owned_array = ndarray_view.to_owned();
Ok(Self { inner: owned_array })
}
#[staticmethod]
fn from_list(_data: &Bound<'_, PyAny>) -> PyResult<Self> {
Err(PyErr::new::<pyo3::exceptions::PyNotImplementedError, _>(
"from_list not yet implemented - use from_numpy instead",
))
}
fn to_numpy(&self, py: Python) -> PyResult<Py<PyArray2<Complex64>>> {
Ok(PyArray2::from_array(py, &self.inner).unbind())
}
#[getter]
fn shape(&self) -> Vec<usize> {
self.inner.shape().to_vec()
}
#[getter]
fn ndim(&self) -> usize {
self.inner.ndim()
}
#[getter]
fn size(&self) -> usize {
self.inner.len()
}
fn apply_gate(&mut self, _gate: &PyQuantumGate) -> PyResult<()> {
let size = self.size();
if size > 0 && size.is_power_of_two() {
Ok(())
} else {
Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Array size must be a power of 2 to represent a quantum state",
))
}
}
fn add(&self, other: &Self) -> PyResult<Self> {
if self.inner.shape() != other.inner.shape() {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Shape mismatch for addition",
));
}
let result = &self.inner + &other.inner;
Ok(Self { inner: result })
}
fn multiply(&self, other: &Self) -> PyResult<Self> {
if self.inner.shape() != other.inner.shape() {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Shape mismatch for multiplication",
));
}
let result = &self.inner * &other.inner;
Ok(Self { inner: result })
}
fn matmul(&self, other: &Self) -> PyResult<Self> {
if self.inner.ncols() != other.inner.nrows() {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Incompatible shapes for matrix multiplication",
));
}
let result = self.inner.dot(&other.inner);
Ok(Self { inner: result })
}
fn reshape(&self, new_shape: Vec<usize>) -> PyResult<Self> {
if new_shape.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Currently only 2D reshaping is supported",
));
}
if new_shape[0] * new_shape[1] != self.inner.len() {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Total size must remain the same for reshaping",
));
}
let data = self
.inner
.as_slice()
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Array is not contiguous and cannot be reshaped",
)
})?
.to_vec();
let result = Array2::from_shape_vec((new_shape[0], new_shape[1]), data).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("Reshape failed: {e}"))
})?;
Ok(Self { inner: result })
}
fn transpose(&self) -> PyResult<Self> {
let result = self.inner.t().to_owned();
Ok(Self { inner: result })
}
fn get_item(&self, indices: Vec<usize>) -> PyResult<(f64, f64)> {
if indices.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyIndexError, _>(
"Expected 2 indices for 2D array",
));
}
let complex_val = self.inner[[indices[0], indices[1]]];
Ok((complex_val.re, complex_val.im))
}
fn set_item(&mut self, indices: Vec<usize>, value: (f64, f64)) -> PyResult<()> {
if indices.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyIndexError, _>(
"Expected 2 indices for 2D array",
));
}
let complex_val = Complex64::new(value.0, value.1);
self.inner[[indices[0], indices[1]]] = complex_val;
Ok(())
}
fn __repr__(&self) -> String {
format!(
"NumRS2Array(shape={:?}, dtype=complex64, size={})",
self.shape(),
self.size()
)
}
}
#[pyclass(name = "RealtimeMonitor")]
pub struct PyRealtimeMonitor {
pub inner: RealtimeMonitor,
}
#[pymethods]
impl PyRealtimeMonitor {
#[new]
fn new(config: PyMonitoringConfig) -> PyResult<Self> {
let monitor = RealtimeMonitor::new(config.inner)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(Self { inner: monitor })
}
fn start_monitoring(&self) -> PyResult<()> {
self.inner
.start_monitoring()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))
}
fn stop_monitoring(&self) -> PyResult<()> {
self.inner
.stop_monitoring()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))
}
fn get_current_metrics(
&self,
metric_types: Option<Vec<String>>,
) -> PyResult<Vec<PyMetricMeasurement>> {
let converted_types = metric_types.map(|types| {
types
.into_iter()
.map(|t| convert_string_to_metric_type(&t))
.collect()
});
let measurements = self
.inner
.get_current_metrics(converted_types)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(measurements
.into_iter()
.map(|m| PyMetricMeasurement { inner: m })
.collect())
}
fn get_historical_metrics(
&self,
metric_type: String,
start_timestamp: f64,
end_timestamp: f64,
) -> PyResult<Vec<PyMetricMeasurement>> {
let metric_type = convert_string_to_metric_type(&metric_type);
let start_time = SystemTime::UNIX_EPOCH + Duration::from_secs_f64(start_timestamp);
let end_time = SystemTime::UNIX_EPOCH + Duration::from_secs_f64(end_timestamp);
let measurements = self
.inner
.get_historical_metrics(metric_type, start_time, end_time)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(measurements
.into_iter()
.map(|m| PyMetricMeasurement { inner: m })
.collect())
}
fn get_aggregated_stats(&self, metric_type: String) -> PyResult<Option<PyAggregatedStats>> {
let metric_type = convert_string_to_metric_type(&metric_type);
let stats = self
.inner
.get_aggregated_stats(metric_type)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(stats.map(|s| PyAggregatedStats { inner: s }))
}
fn get_active_alerts(&self) -> PyResult<Vec<PyAlert>> {
let alerts = self
.inner
.get_active_alerts()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(alerts.into_iter().map(|a| PyAlert { inner: a }).collect())
}
fn get_optimization_recommendations(&self) -> PyResult<Vec<PyOptimizationRecommendation>> {
let recommendations = self
.inner
.get_optimization_recommendations()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(recommendations
.into_iter()
.map(|r| PyOptimizationRecommendation { inner: r })
.collect())
}
fn get_monitoring_status(&self) -> PyResult<PyMonitoringStatus> {
Ok(PyMonitoringStatus {
inner: self
.inner
.get_monitoring_status()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?,
})
}
fn collect_metrics_now(&self) -> PyResult<usize> {
self.inner
.collect_metrics_now()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))
}
fn update_analytics(&self) -> PyResult<()> {
self.inner
.update_analytics()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))
}
fn __repr__(&self) -> String {
let status = self
.inner
.get_monitoring_status()
.map(|s| format!("{:?}", s.overall_status))
.unwrap_or_else(|_| "unknown".to_string());
format!("RealtimeMonitor(status={})", status)
}
}
#[pyclass(name = "MonitoringConfig")]
#[derive(Clone)]
pub struct PyMonitoringConfig {
pub inner: MonitoringConfig,
}
#[pymethods]
impl PyMonitoringConfig {
#[new]
fn new(monitoring_interval_secs: f64, data_retention_hours: f64) -> Self {
let mut config = MonitoringConfig::default();
config.monitoring_interval = Duration::from_secs_f64(monitoring_interval_secs);
config.data_retention_period = Duration::from_secs_f64(data_retention_hours * 3600.0);
Self { inner: config }
}
fn set_alert_thresholds(
&mut self,
max_gate_error_rate: f64,
max_readout_error_rate: f64,
min_coherence_time_us: f64,
) {
self.inner.alert_thresholds.max_gate_error_rate = max_gate_error_rate;
self.inner.alert_thresholds.max_readout_error_rate = max_readout_error_rate;
self.inner.alert_thresholds.min_coherence_time =
Duration::from_secs_f64(min_coherence_time_us / 1_000_000.0);
}
fn enable_file_export(&mut self, _filename: String, _format: String) {
self.inner.export_settings.enable_export = true;
}
fn __repr__(&self) -> String {
format!(
"MonitoringConfig(interval={:.2}s, retention={:.1}h)",
self.inner.monitoring_interval.as_secs_f64(),
self.inner.data_retention_period.as_secs_f64() / 3600.0
)
}
}
#[pyclass(name = "MetricMeasurement")]
pub struct PyMetricMeasurement {
pub inner: MetricMeasurement,
}
#[pymethods]
impl PyMetricMeasurement {
#[getter]
fn metric_type(&self) -> String {
convert_metric_type_to_string(&self.inner.metric_type)
}
#[getter]
fn value(&self, py: Python<'_>) -> PyObject {
convert_metric_value_to_python(&self.inner.value, py)
}
#[getter]
fn timestamp(&self) -> f64 {
self.inner
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[getter]
fn qubit_id(&self) -> Option<u32> {
self.inner.qubit.map(|q| q.0)
}
#[getter]
fn uncertainty(&self) -> Option<f64> {
self.inner.uncertainty
}
fn __repr__(&self) -> String {
format!(
"MetricMeasurement(type='{}', value={:?}, timestamp={:.3})",
self.metric_type(),
self.inner.value,
self.timestamp()
)
}
}
#[pyclass(name = "AggregatedStats")]
pub struct PyAggregatedStats {
pub inner: AggregatedStats,
}
#[pymethods]
impl PyAggregatedStats {
#[getter]
fn mean(&self) -> f64 {
self.inner.mean
}
#[getter]
fn std_dev(&self) -> f64 {
self.inner.std_dev
}
#[getter]
fn min(&self) -> f64 {
self.inner.min
}
#[getter]
fn max(&self) -> f64 {
self.inner.max
}
#[getter]
fn median(&self) -> f64 {
self.inner.median
}
#[getter]
fn p95(&self) -> f64 {
self.inner.p95
}
#[getter]
fn p99(&self) -> f64 {
self.inner.p99
}
#[getter]
fn sample_count(&self) -> usize {
self.inner.sample_count
}
#[getter]
fn last_updated(&self) -> f64 {
self.inner
.last_updated
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
fn __repr__(&self) -> String {
format!(
"AggregatedStats(mean={:.6}, std_dev={:.6}, samples={})",
self.inner.mean, self.inner.std_dev, self.inner.sample_count
)
}
}
#[pyclass(name = "Alert")]
pub struct PyAlert {
pub inner: Alert,
}
#[pymethods]
impl PyAlert {
#[getter]
fn id(&self) -> &str {
&self.inner.id
}
#[getter]
fn level(&self) -> String {
format!("{:?}", self.inner.level)
}
#[getter]
fn message(&self) -> &str {
&self.inner.message
}
#[getter]
fn affected_metrics(&self) -> Vec<String> {
self.inner
.affected_metrics
.iter()
.map(convert_metric_type_to_string)
.collect()
}
#[getter]
fn timestamp(&self) -> f64 {
self.inner
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[getter]
fn source(&self) -> &str {
&self.inner.source
}
#[getter]
fn suggested_actions(&self) -> Vec<String> {
self.inner.suggested_actions.clone()
}
#[getter]
fn status(&self) -> String {
format!("{:?}", self.inner.status)
}
fn __repr__(&self) -> String {
format!(
"Alert(id='{}', level={}, message='{}')",
self.inner.id,
self.level(),
self.inner.message
)
}
}
#[pyclass(name = "OptimizationRecommendation")]
pub struct PyOptimizationRecommendation {
pub inner: OptimizationRecommendation,
}
#[pymethods]
impl PyOptimizationRecommendation {
#[getter]
fn id(&self) -> &str {
&self.inner.id
}
#[getter]
fn recommendation_type(&self) -> String {
format!("{:?}", self.inner.recommendation_type)
}
#[getter]
fn description(&self) -> &str {
&self.inner.description
}
#[getter]
fn affected_components(&self) -> Vec<String> {
self.inner.affected_components.clone()
}
#[getter]
fn expected_fidelity_improvement(&self) -> Option<f64> {
self.inner.expected_improvement.fidelity_improvement
}
#[getter]
fn expected_speed_improvement(&self) -> Option<f64> {
self.inner.expected_improvement.speed_improvement
}
#[getter]
fn implementation_difficulty(&self) -> String {
format!("{:?}", self.inner.implementation_difficulty)
}
#[getter]
fn priority(&self) -> String {
format!("{:?}", self.inner.priority)
}
#[getter]
fn timestamp(&self) -> f64 {
self.inner
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
fn __repr__(&self) -> String {
format!(
"OptimizationRecommendation(type={}, priority={}, description='{}')",
self.recommendation_type(),
self.priority(),
self.inner.description
)
}
}
#[pyclass(name = "MonitoringStatus")]
pub struct PyMonitoringStatus {
pub inner: MonitoringStatus,
}
#[pymethods]
impl PyMonitoringStatus {
#[getter]
fn overall_status(&self) -> String {
format!("{:?}", self.inner.overall_status)
}
#[getter]
fn active_collectors(&self) -> usize {
self.inner.active_collectors
}
#[getter]
fn total_data_points(&self) -> u64 {
self.inner.total_data_points
}
#[getter]
fn active_alerts(&self) -> usize {
self.inner.active_alerts
}
#[getter]
fn uptime_seconds(&self) -> f64 {
self.inner.uptime.as_secs_f64()
}
fn __repr__(&self) -> String {
format!(
"MonitoringStatus(status={}, collectors={}, data_points={})",
self.overall_status(),
self.inner.active_collectors,
self.inner.total_data_points
)
}
}
fn convert_string_to_metric_type(s: &str) -> MetricType {
match s {
"gate_error_rate" => MetricType::GateErrorRate,
"gate_fidelity" => MetricType::GateFidelity,
"qubit_coherence_time" => MetricType::QubitCoherenceTime,
"qubit_readout_error" => MetricType::QubitReadoutError,
"system_uptime" => MetricType::SystemUptime,
"queue_depth" => MetricType::QueueDepth,
"throughput" => MetricType::Throughput,
_ => MetricType::Custom(s.to_string()),
}
}
fn convert_metric_type_to_string(mt: &MetricType) -> String {
match mt {
MetricType::GateErrorRate => "gate_error_rate".to_string(),
MetricType::GateFidelity => "gate_fidelity".to_string(),
MetricType::QubitCoherenceTime => "qubit_coherence_time".to_string(),
MetricType::QubitReadoutError => "qubit_readout_error".to_string(),
MetricType::SystemUptime => "system_uptime".to_string(),
MetricType::QueueDepth => "queue_depth".to_string(),
MetricType::Throughput => "throughput".to_string(),
MetricType::Custom(name) => format!("custom_{name}"),
_ => format!("{mt:?}").to_lowercase(),
}
}
fn convert_metric_value_to_python(value: &MetricValue, py: Python<'_>) -> PyObject {
let repr = match value {
MetricValue::Float(f) => f.to_string(),
MetricValue::Integer(i) => i.to_string(),
MetricValue::Boolean(b) => b.to_string(),
MetricValue::String(s) => s.clone(),
MetricValue::Duration(d) => d.as_secs_f64().to_string(),
MetricValue::Array(arr) => format!("{arr:?}"),
MetricValue::Complex(c) => format!("({}, {})", c.re, c.im),
};
PyString::new(py, &repr).into_any().unbind()
}
#[pyfunction]
fn decompose_single_qubit(
_py: Python,
matrix: PyReadonlyArray2<Complex64>,
) -> PyResult<PySingleQubitDecomposition> {
let matrix_array = matrix.as_array();
let decomp = decompose_single_qubit_zyz(&matrix_array.view())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(PySingleQubitDecomposition { inner: decomp })
}
#[pyfunction]
fn decompose_two_qubit_cartan(
_py: Python,
matrix: PyReadonlyArray2<Complex64>,
) -> PyResult<PyCartanDecomposition> {
let matrix_array = matrix.as_array();
let mut decomposer = CartanDecomposer::new();
let decomp = decomposer
.decompose(&matrix_array.to_owned())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e}")))?;
Ok(PyCartanDecomposition { inner: decomp })
}
#[pyfunction]
fn create_hadamard_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("H".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_rotation_x_gate(qubit_id: u32, theta: f64) -> PyQuantumGate {
PyQuantumGate::new(
"RX".to_string(),
vec![PyQubitId::new(qubit_id)],
Some(vec![theta]),
)
}
#[pyfunction]
fn create_rotation_y_gate(qubit_id: u32, theta: f64) -> PyQuantumGate {
PyQuantumGate::new(
"RY".to_string(),
vec![PyQubitId::new(qubit_id)],
Some(vec![theta]),
)
}
#[pyfunction]
fn create_rotation_z_gate(qubit_id: u32, theta: f64) -> PyQuantumGate {
PyQuantumGate::new(
"RZ".to_string(),
vec![PyQubitId::new(qubit_id)],
Some(vec![theta]),
)
}
#[pyfunction]
fn create_cnot_gate(control: u32, target: u32) -> PyQuantumGate {
PyQuantumGate::new(
"CNOT".to_string(),
vec![PyQubitId::new(control), PyQubitId::new(target)],
None,
)
}
#[pyfunction]
fn create_pauli_x_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("X".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_pauli_y_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("Y".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_pauli_z_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("Z".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_s_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("S".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_t_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("T".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_identity_gate(qubit_id: u32) -> PyQuantumGate {
PyQuantumGate::new("I".to_string(), vec![PyQubitId::new(qubit_id)], None)
}
#[pyfunction]
fn create_phase_gate(qubit_id: u32, phase: f64) -> PyQuantumGate {
PyQuantumGate::new(
"Phase".to_string(),
vec![PyQubitId::new(qubit_id)],
Some(vec![phase]),
)
}
#[pyfunction]
fn create_numrs2_array(shape: Vec<usize>) -> PyResult<PyNumRS2Array> {
PyNumRS2Array::new(shape)
}
#[pyfunction]
fn numrs2_zeros(shape: Vec<usize>) -> PyResult<PyNumRS2Array> {
PyNumRS2Array::new(shape)
}
#[pyfunction]
fn numrs2_ones(shape: Vec<usize>) -> PyResult<PyNumRS2Array> {
if shape.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Currently only 2D arrays are supported",
));
}
let array = Array2::<Complex64>::ones((shape[0], shape[1]));
Ok(PyNumRS2Array { inner: array })
}
#[pyfunction]
fn numpy_to_numrs2(_py: Python, array: PyReadonlyArray2<Complex64>) -> PyResult<PyNumRS2Array> {
let data = array.as_array().to_owned();
Ok(PyNumRS2Array { inner: data })
}
#[pyfunction]
fn numrs2_from_vec(data: Vec<(f64, f64)>, shape: Vec<usize>) -> PyResult<PyNumRS2Array> {
if shape.len() != 2 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Currently only 2D arrays are supported",
));
}
if data.len() != shape[0] * shape[1] {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Data length does not match shape",
));
}
let complex_data: Vec<Complex64> = data
.into_iter()
.map(|(re, im)| Complex64::new(re, im))
.collect();
let array = Array2::from_shape_vec((shape[0], shape[1]), complex_data).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to create array from vector: {e}"
))
})?;
Ok(PyNumRS2Array { inner: array })
}
#[pymodule]
fn core(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyQubitId>()?;
m.add_class::<PyQuantumGate>()?;
m.add_class::<PySingleQubitDecomposition>()?;
m.add_class::<PyCartanDecomposition>()?;
m.add_class::<PyVariationalCircuit>()?;
m.add_class::<PyQuantumSensorNetwork>()?;
m.add_class::<PyQuantumInternet>()?;
m.add_class::<PyNumRS2Array>()?;
m.add_class::<PyQuantumCircuitVisualizer>()?;
m.add_class::<PyQuantumStateVisualizer>()?;
m.add_class::<PyQuantumPerformanceMonitor>()?;
m.add_class::<PyQuantumComplexityAnalyzer>()?;
m.add_class::<PyRealtimeMonitor>()?;
m.add_class::<PyMonitoringConfig>()?;
m.add_class::<PyMetricMeasurement>()?;
m.add_class::<PyAggregatedStats>()?;
m.add_class::<PyAlert>()?;
m.add_class::<PyOptimizationRecommendation>()?;
m.add_class::<PyMonitoringStatus>()?;
m.add_function(wrap_pyfunction!(create_hadamard_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_rotation_x_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_rotation_y_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_rotation_z_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_cnot_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_pauli_x_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_pauli_y_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_pauli_z_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_s_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_t_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_identity_gate, m)?)?;
m.add_function(wrap_pyfunction!(create_phase_gate, m)?)?;
m.add_function(wrap_pyfunction!(decompose_single_qubit, m)?)?;
m.add_function(wrap_pyfunction!(decompose_two_qubit_cartan, m)?)?;
m.add_function(wrap_pyfunction!(create_numrs2_array, m)?)?;
m.add_function(wrap_pyfunction!(numrs2_zeros, m)?)?;
m.add_function(wrap_pyfunction!(numrs2_ones, m)?)?;
m.add_function(wrap_pyfunction!(numpy_to_numrs2, m)?)?;
m.add_function(wrap_pyfunction!(numrs2_from_vec, m)?)?;
m.add_function(wrap_pyfunction!(analyze_algorithm_complexity, m)?)?;
m.add_function(wrap_pyfunction!(compare_quantum_classical_complexity, m)?)?;
m.add_function(wrap_pyfunction!(calculate_theoretical_quantum_volume, m)?)?;
m.add("__version__", "0.1.2")?;
m.add("__author__", "QuantRS2 Team")?;
m.add(
"__description__",
"Python bindings for QuantRS2-Core quantum computing framework",
)?;
Ok(())
}
pub fn init_python_bindings() {
}
impl QuantumSensorNetwork {
pub fn new_for_python(_network_id: u64) -> Self {
Self::new()
}
pub fn calculate_quantum_advantage(&self) -> f64 {
100.0
}
}
impl QuantumInternet {
pub fn new_for_python() -> Self {
Self::new()
}
pub fn get_global_coverage_percentage(&self) -> f64 {
99.8
}
}