use crate::error::{MLError, Result};
use scirs2_core::ndarray::{Array1, Array2};
use scirs2_core::Complex64;
use std::collections::HashMap;
use super::{CType, TQDevice, TQModule, TQParameter};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SingleQubitNoiseType {
Depolarizing(f64),
AmplitudeDamping(f64),
PhaseDamping(f64),
BitFlip(f64),
PhaseFlip(f64),
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TwoQubitNoiseType {
Depolarizing(f64),
CorrelatedDephasing(f64),
CrossTalk(f64),
}
#[derive(Debug, Clone)]
pub struct NoiseModel {
pub single_qubit_errors: HashMap<usize, SingleQubitNoiseType>,
pub two_qubit_errors: HashMap<(usize, usize), TwoQubitNoiseType>,
pub readout_errors: HashMap<usize, f64>,
pub coherence_times: HashMap<usize, (f64, f64)>,
pub gate_times: GateTimes,
pub noise_scale: f64,
}
#[derive(Debug, Clone)]
pub struct GateTimes {
pub single_qubit: f64,
pub two_qubit: f64,
pub measurement: f64,
}
impl Default for GateTimes {
fn default() -> Self {
Self {
single_qubit: 0.05, two_qubit: 0.3, measurement: 1.0, }
}
}
impl NoiseModel {
pub fn ideal() -> Self {
Self {
single_qubit_errors: HashMap::new(),
two_qubit_errors: HashMap::new(),
readout_errors: HashMap::new(),
coherence_times: HashMap::new(),
gate_times: GateTimes::default(),
noise_scale: 0.0,
}
}
pub fn uniform_depolarizing(n_qubits: usize, p1: f64, p2: f64) -> Self {
let mut model = Self::ideal();
model.noise_scale = 1.0;
for q in 0..n_qubits {
model
.single_qubit_errors
.insert(q, SingleQubitNoiseType::Depolarizing(p1));
}
for q1 in 0..n_qubits {
for q2 in (q1 + 1)..n_qubits {
model
.two_qubit_errors
.insert((q1, q2), TwoQubitNoiseType::Depolarizing(p2));
}
}
model
}
pub fn from_ibm_properties(
n_qubits: usize,
t1_times: &[f64],
t2_times: &[f64],
single_gate_errors: &[f64],
two_gate_errors: &[(usize, usize, f64)],
readout_errors: &[f64],
) -> Self {
let mut model = Self::ideal();
model.noise_scale = 1.0;
for q in 0..n_qubits {
if q < t1_times.len() && q < t2_times.len() {
model.coherence_times.insert(q, (t1_times[q], t2_times[q]));
}
if q < single_gate_errors.len() {
model
.single_qubit_errors
.insert(q, SingleQubitNoiseType::Depolarizing(single_gate_errors[q]));
}
if q < readout_errors.len() {
model.readout_errors.insert(q, readout_errors[q]);
}
}
for (q1, q2, err) in two_gate_errors {
model
.two_qubit_errors
.insert((*q1, *q2), TwoQubitNoiseType::Depolarizing(*err));
}
model
}
pub fn effective_single_error(&self, qubit: usize) -> f64 {
self.single_qubit_errors
.get(&qubit)
.map(|e| match e {
SingleQubitNoiseType::Depolarizing(p) => *p,
SingleQubitNoiseType::AmplitudeDamping(p) => *p,
SingleQubitNoiseType::PhaseDamping(p) => *p,
SingleQubitNoiseType::BitFlip(p) => *p,
SingleQubitNoiseType::PhaseFlip(p) => *p,
})
.unwrap_or(0.0)
* self.noise_scale
}
pub fn effective_two_qubit_error(&self, q1: usize, q2: usize) -> f64 {
let key = if q1 < q2 { (q1, q2) } else { (q2, q1) };
self.two_qubit_errors
.get(&key)
.map(|e| match e {
TwoQubitNoiseType::Depolarizing(p) => *p,
TwoQubitNoiseType::CorrelatedDephasing(p) => *p,
TwoQubitNoiseType::CrossTalk(p) => *p,
})
.unwrap_or(0.0)
* self.noise_scale
}
}
#[derive(Debug, Clone)]
pub struct NoiseAwareGradientConfig {
pub shots: usize,
pub shift: f64,
pub include_noise_in_gradient: bool,
pub variance_reduction: VarianceReduction,
pub n_repetitions: usize,
}
impl Default for NoiseAwareGradientConfig {
fn default() -> Self {
Self {
shots: 1000,
shift: std::f64::consts::FRAC_PI_2,
include_noise_in_gradient: true,
variance_reduction: VarianceReduction::None,
n_repetitions: 1,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VarianceReduction {
None,
CommonRandomNumbers,
AntitheticVariates,
ControlVariates,
}
#[derive(Debug, Clone)]
pub struct NoiseAwareGradient {
pub noise_model: NoiseModel,
pub config: NoiseAwareGradientConfig,
gradient_variances: HashMap<String, f64>,
}
impl NoiseAwareGradient {
pub fn new(noise_model: NoiseModel) -> Self {
Self {
noise_model,
config: NoiseAwareGradientConfig::default(),
gradient_variances: HashMap::new(),
}
}
pub fn with_config(noise_model: NoiseModel, config: NoiseAwareGradientConfig) -> Self {
Self {
noise_model,
config,
gradient_variances: HashMap::new(),
}
}
pub fn compute_gradient<F>(
&mut self,
param_idx: usize,
current_params: &[f64],
expectation_fn: F,
) -> Result<(f64, f64)>
where
F: Fn(&[f64]) -> Result<f64>,
{
let mut params_plus = current_params.to_vec();
let mut params_minus = current_params.to_vec();
params_plus[param_idx] += self.config.shift;
params_minus[param_idx] -= self.config.shift;
let mut gradient_estimates = Vec::with_capacity(self.config.n_repetitions);
for _ in 0..self.config.n_repetitions {
let exp_plus = expectation_fn(¶ms_plus)?;
let exp_minus = expectation_fn(¶ms_minus)?;
let gradient = (exp_plus - exp_minus) / (2.0 * self.config.shift.sin());
gradient_estimates.push(gradient);
}
let mean = gradient_estimates.iter().sum::<f64>() / gradient_estimates.len() as f64;
let variance = if gradient_estimates.len() > 1 {
gradient_estimates
.iter()
.map(|g| (g - mean).powi(2))
.sum::<f64>()
/ (gradient_estimates.len() - 1) as f64
} else {
0.0
};
let corrected_gradient = if self.config.include_noise_in_gradient {
self.apply_noise_correction(mean, param_idx)
} else {
mean
};
Ok((corrected_gradient, variance))
}
fn apply_noise_correction(&self, gradient: f64, _param_idx: usize) -> f64 {
let avg_error: f64 = self
.noise_model
.single_qubit_errors
.values()
.map(|e| match e {
SingleQubitNoiseType::Depolarizing(p) => *p,
SingleQubitNoiseType::AmplitudeDamping(p) => *p,
SingleQubitNoiseType::PhaseDamping(p) => *p,
SingleQubitNoiseType::BitFlip(p) => *p,
SingleQubitNoiseType::PhaseFlip(p) => *p,
})
.sum::<f64>()
/ self.noise_model.single_qubit_errors.len().max(1) as f64;
let scale_factor = 1.0 / (1.0 - 2.0 * avg_error).max(0.1);
gradient * scale_factor
}
pub fn compute_all_gradients<F>(
&mut self,
params: &[f64],
expectation_fn: F,
) -> Result<(Vec<f64>, Vec<f64>)>
where
F: Fn(&[f64]) -> Result<f64> + Clone,
{
let mut gradients = Vec::with_capacity(params.len());
let mut variances = Vec::with_capacity(params.len());
for i in 0..params.len() {
let (grad, var) = self.compute_gradient(i, params, expectation_fn.clone())?;
gradients.push(grad);
variances.push(var);
}
Ok((gradients, variances))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MitigationMethod {
None,
ZNE,
PEC,
ReadoutMitigation,
Twirling,
}
#[derive(Debug, Clone)]
pub struct MitigatedExpectationConfig {
pub method: MitigationMethod,
pub shots: usize,
pub zne_scale_factors: Vec<f64>,
pub zne_extrapolation: ZNEExtrapolation,
pub apply_readout_mitigation: bool,
}
impl Default for MitigatedExpectationConfig {
fn default() -> Self {
Self {
method: MitigationMethod::ZNE,
shots: 4000,
zne_scale_factors: vec![1.0, 1.5, 2.0],
zne_extrapolation: ZNEExtrapolation::Linear,
apply_readout_mitigation: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZNEExtrapolation {
Linear,
Polynomial,
Exponential,
Richardson,
}
#[derive(Debug, Clone)]
pub struct MitigatedExpectation {
pub noise_model: NoiseModel,
pub config: MitigatedExpectationConfig,
readout_calibration: Option<Array2<f64>>,
}
impl MitigatedExpectation {
pub fn new(noise_model: NoiseModel) -> Self {
Self {
noise_model,
config: MitigatedExpectationConfig::default(),
readout_calibration: None,
}
}
pub fn with_config(noise_model: NoiseModel, config: MitigatedExpectationConfig) -> Self {
Self {
noise_model,
config,
readout_calibration: None,
}
}
pub fn compute<F>(&self, raw_expectation_fn: F) -> Result<f64>
where
F: Fn(f64) -> Result<f64>,
{
match self.config.method {
MitigationMethod::None => raw_expectation_fn(1.0),
MitigationMethod::ZNE => self.compute_zne(raw_expectation_fn),
MitigationMethod::ReadoutMitigation => {
let raw = raw_expectation_fn(1.0)?;
self.apply_readout_mitigation(raw)
}
_ => raw_expectation_fn(1.0), }
}
fn compute_zne<F>(&self, raw_expectation_fn: F) -> Result<f64>
where
F: Fn(f64) -> Result<f64>,
{
let mut scaled_values = Vec::with_capacity(self.config.zne_scale_factors.len());
for &scale in &self.config.zne_scale_factors {
let value = raw_expectation_fn(scale)?;
scaled_values.push((scale, value));
}
self.extrapolate_to_zero(&scaled_values)
}
fn extrapolate_to_zero(&self, points: &[(f64, f64)]) -> Result<f64> {
if points.is_empty() {
return Err(MLError::InvalidConfiguration(
"No data points for extrapolation".to_string(),
));
}
if points.len() == 1 {
return Ok(points[0].1);
}
match self.config.zne_extrapolation {
ZNEExtrapolation::Linear => {
let n = points.len() as f64;
let sum_x: f64 = points.iter().map(|(x, _)| x).sum();
let sum_y: f64 = points.iter().map(|(_, y)| y).sum();
let sum_xy: f64 = points.iter().map(|(x, y)| x * y).sum();
let sum_x2: f64 = points.iter().map(|(x, _)| x * x).sum();
let denom = n * sum_x2 - sum_x * sum_x;
if denom.abs() < 1e-10 {
return Ok(sum_y / n);
}
let a = (sum_y * sum_x2 - sum_x * sum_xy) / denom;
Ok(a) }
ZNEExtrapolation::Exponential => {
let log_points: Vec<(f64, f64)> = points
.iter()
.filter(|(_, y)| *y > 0.0)
.map(|(x, y)| (*x, y.ln()))
.collect();
if log_points.is_empty() {
return Ok(points[0].1);
}
let n = log_points.len() as f64;
let sum_x: f64 = log_points.iter().map(|(x, _)| x).sum();
let sum_y: f64 = log_points.iter().map(|(_, y)| y).sum();
let sum_xy: f64 = log_points.iter().map(|(x, y)| x * y).sum();
let sum_x2: f64 = log_points.iter().map(|(x, _)| x * x).sum();
let denom = n * sum_x2 - sum_x * sum_x;
if denom.abs() < 1e-10 {
return Ok((sum_y / n).exp());
}
let log_a = (sum_y * sum_x2 - sum_x * sum_xy) / denom;
Ok(log_a.exp())
}
_ => {
let sum_y: f64 = points.iter().map(|(_, y)| y).sum();
Ok(sum_y / points.len() as f64)
}
}
}
fn apply_readout_mitigation(&self, raw_value: f64) -> Result<f64> {
let avg_readout_error: f64 = self.noise_model.readout_errors.values().sum::<f64>()
/ self.noise_model.readout_errors.len().max(1) as f64;
let corrected = (raw_value - avg_readout_error) / (1.0 - 2.0 * avg_readout_error);
Ok(corrected.clamp(-1.0, 1.0))
}
pub fn calibrate_readout(&mut self, n_qubits: usize) -> Result<()> {
let dim = 1 << n_qubits;
let mut cal_matrix = Array2::<f64>::eye(dim);
for q in 0..n_qubits {
if let Some(&err) = self.noise_model.readout_errors.get(&q) {
for i in 0..dim {
let bit = (i >> q) & 1;
if bit == 0 {
cal_matrix[[i, i]] *= 1.0 - err;
} else {
cal_matrix[[i, i]] *= 1.0 - err;
}
}
}
}
self.readout_calibration = Some(cal_matrix);
Ok(())
}
}
#[derive(Debug)]
pub struct NoiseAwareTrainer {
pub gradient_estimator: NoiseAwareGradient,
pub expectation_estimator: MitigatedExpectation,
pub history: TrainingHistory,
}
#[derive(Debug, Clone, Default)]
pub struct TrainingHistory {
pub losses: Vec<f64>,
pub gradient_norms: Vec<f64>,
pub mitigation_improvement: Vec<f64>,
}
impl NoiseAwareTrainer {
pub fn new(noise_model: NoiseModel) -> Self {
Self {
gradient_estimator: NoiseAwareGradient::new(noise_model.clone()),
expectation_estimator: MitigatedExpectation::new(noise_model),
history: TrainingHistory::default(),
}
}
pub fn step<F>(&mut self, params: &mut [f64], loss_fn: F, learning_rate: f64) -> Result<f64>
where
F: Fn(&[f64]) -> Result<f64> + Clone,
{
let (gradients, variances) = self
.gradient_estimator
.compute_all_gradients(params, loss_fn.clone())?;
let grad_norm: f64 = gradients.iter().map(|g| g * g).sum::<f64>().sqrt();
self.history.gradient_norms.push(grad_norm);
for (i, (param, grad)) in params.iter_mut().zip(gradients.iter()).enumerate() {
let variance_factor = 1.0 / (1.0 + variances[i].sqrt());
*param -= learning_rate * grad * variance_factor;
}
let loss = self.expectation_estimator.compute(|scale| {
let _ = scale; loss_fn(params)
})?;
self.history.losses.push(loss);
Ok(loss)
}
pub fn statistics(&self) -> TrainingStatistics {
let n = self.history.losses.len();
if n == 0 {
return TrainingStatistics::default();
}
let avg_loss = self.history.losses.iter().sum::<f64>() / n as f64;
let avg_grad_norm = self.history.gradient_norms.iter().sum::<f64>() / n as f64;
let recent_loss = self.history.losses.last().copied().unwrap_or(0.0);
TrainingStatistics {
epochs: n,
average_loss: avg_loss,
recent_loss,
average_gradient_norm: avg_grad_norm,
converged: avg_grad_norm < 1e-6,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TrainingStatistics {
pub epochs: usize,
pub average_loss: f64,
pub recent_loss: f64,
pub average_gradient_norm: f64,
pub converged: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_noise_model_ideal() {
let model = NoiseModel::ideal();
assert_eq!(model.noise_scale, 0.0);
assert!(model.single_qubit_errors.is_empty());
}
#[test]
fn test_noise_model_depolarizing() {
let model = NoiseModel::uniform_depolarizing(4, 0.01, 0.02);
assert_eq!(model.single_qubit_errors.len(), 4);
assert!(!model.two_qubit_errors.is_empty());
}
#[test]
fn test_noise_aware_gradient() {
let model = NoiseModel::uniform_depolarizing(2, 0.01, 0.02);
let mut estimator = NoiseAwareGradient::new(model);
let params = vec![0.5, 0.3];
let (grad, var) = estimator
.compute_gradient(0, ¶ms, |p| Ok(p[0].sin() + p[1].cos()))
.expect("Should compute gradient");
assert!((grad - 0.5_f64.cos()).abs() < 0.3); assert!(var >= 0.0);
}
#[test]
fn test_mitigated_expectation_linear() {
let model = NoiseModel::ideal();
let estimator = MitigatedExpectation::new(model);
let result = estimator
.compute(|scale| Ok(1.0 - 0.1 * scale))
.expect("Should compute");
assert!((result - 1.0).abs() < 0.1);
}
#[test]
fn test_training_statistics() {
let model = NoiseModel::ideal();
let trainer = NoiseAwareTrainer::new(model);
let stats = trainer.statistics();
assert_eq!(stats.epochs, 0);
assert!(!stats.converged);
}
#[test]
fn test_mitigation_methods() {
assert_eq!(MitigationMethod::ZNE, MitigationMethod::ZNE);
assert_ne!(MitigationMethod::ZNE, MitigationMethod::PEC);
}
#[test]
fn test_zne_extrapolation() {
let model = NoiseModel::ideal();
let config = MitigatedExpectationConfig {
method: MitigationMethod::ZNE,
shots: 1000,
zne_scale_factors: vec![1.0, 2.0, 3.0],
zne_extrapolation: ZNEExtrapolation::Linear,
apply_readout_mitigation: false,
};
let estimator = MitigatedExpectation::with_config(model, config);
let result = estimator
.compute(|scale| Ok(1.0 - 0.1 * scale))
.expect("Should succeed");
assert!((result - 1.0).abs() < 0.1);
}
}