use std::{convert::TryFrom, ops::BitXor};
use num::{complex::Complex64, ToPrimitive};
use quil_rs::instruction::{ExternParameter, ExternParameterType, ExternSignature};
use quil_rs::{
instruction::{Call, CallError, ExternError, UnresolvedCallArgument},
quil::ToQuilError,
};
#[cfg(feature = "stubs")]
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyfunction, gen_stub_pymethods};
const MAX_SEQUENCER_VALUE: u64 = 0x0000_FFFF_FFFF_FFFF;
const MAX_UNSIGNED_MULTIPLIER: u64 = 0x0000_0000_0000_FFFF;
const V1_TAPS: [u32; 4] = [47, 46, 20, 19];
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(
"seed values must be in range [1, {MAX_SEQUENCER_VALUE}] and losslessly convertible to f64, found {0}"
)]
InvalidSeed(u64),
#[error("error converting to Quil: {0}")]
ToQuilError(#[from] ToQuilError),
#[error("error constructing extern signature: {0}")]
ExternSignatureError(#[from] ExternError),
#[error("destination must be a REAL[], found {destination_type:?}")]
InvalidDestinationType {
destination_type: quil_rs::instruction::ScalarType,
},
#[error("source must be a REAL[], found {source_type:?}")]
InvalidSourceType {
source_type: quil_rs::instruction::ScalarType,
},
#[error(
"destination length must be in range [0, {}] and divisible by the sub-region size, found {destination_length} % {sub_region_size}", 2u64.pow(f64::MANTISSA_DIGITS) - 1
)]
InvalidDestinationLength {
destination_length: u64,
sub_region_size: f64,
},
#[error(
"source length must be in range [0, {}] and divisible by the sub-region size, found {source_length} % {sub_region_size}", 2u64.pow(f64::MANTISSA_DIGITS) - 1
)]
InvalidSourceLength {
source_length: u64,
sub_region_size: f64,
},
}
#[allow(clippy::module_name_repetitions)]
pub type RandomResult<T> = Result<T, Error>;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "stubs", gen_stub_pyclass)]
#[pyo3::pyclass(module = "qcs_sdk.qpu.experimental.random", frozen)]
pub struct ChooseRandomRealSubRegions {
destination_memory_region_name: String,
source_memory_region_name: String,
sub_region_size: f64,
seed_memory_region_name: String,
}
impl ChooseRandomRealSubRegions {
pub fn try_new<T: Into<f64> + Copy>(
destination: &quil_rs::instruction::Declaration,
source: &quil_rs::instruction::Declaration,
sub_region_size: T,
seed: &quil_rs::instruction::MemoryReference,
) -> RandomResult<Self> {
if !matches!(
destination.size.data_type,
quil_rs::instruction::ScalarType::Real
) {
return Err(Error::InvalidDestinationType {
destination_type: destination.size.data_type,
});
}
if !matches!(
source.size.data_type,
quil_rs::instruction::ScalarType::Real
) {
return Err(Error::InvalidSourceType {
source_type: source.size.data_type,
});
}
if destination
.size
.length
.to_f64()
.is_none_or(|destination_length| destination_length % sub_region_size.into() != 0f64)
{
return Err(Error::InvalidDestinationLength {
destination_length: destination.size.length,
sub_region_size: sub_region_size.into(),
});
}
if source
.size
.length
.to_f64()
.is_none_or(|source_length| source_length % sub_region_size.into() != 0f64)
{
return Err(Error::InvalidSourceLength {
source_length: source.size.length,
sub_region_size: sub_region_size.into(),
});
}
Ok(Self {
destination_memory_region_name: destination.name.clone(),
source_memory_region_name: source.name.clone(),
sub_region_size: sub_region_size.into(),
seed_memory_region_name: seed.name.clone(),
})
}
}
#[cfg_attr(not(feature = "stubs"), optipy::strip_pyo3(only_stubs))]
#[cfg_attr(feature = "stubs", gen_stub_pymethods)]
#[cfg_attr(feature = "python", pyo3::pymethods)]
impl ChooseRandomRealSubRegions {
#[classattr]
#[pyo3(name = "NAME")]
pub const EXTERN_NAME: &str = "choose_random_real_sub_regions";
}
impl ChooseRandomRealSubRegions {
#[expect(clippy::missing_panics_doc)]
#[must_use]
pub fn build_signature() -> ExternSignature {
let parameters = vec![
ExternParameter::try_new(
"destination".to_string(),
true,
ExternParameterType::VariableLengthVector(quil_rs::instruction::ScalarType::Real),
)
.expect("`destination` should be a valid identifier"),
ExternParameter::try_new(
"source".to_string(),
false,
ExternParameterType::VariableLengthVector(quil_rs::instruction::ScalarType::Real),
)
.expect("`source` should be a valid identifier"),
ExternParameter::try_new(
"sub_region_size".to_string(),
false,
ExternParameterType::Scalar(quil_rs::instruction::ScalarType::Integer),
)
.expect("`sub_region_size` should be a valid identifier"),
ExternParameter::try_new(
"seed".to_string(),
true,
ExternParameterType::Scalar(quil_rs::instruction::ScalarType::Integer),
)
.expect("`seed` should be a valid identifier"),
];
ExternSignature::new(None, parameters)
}
}
impl TryFrom<ChooseRandomRealSubRegions> for Call {
type Error = CallError;
fn try_from(value: ChooseRandomRealSubRegions) -> Result<Self, Self::Error> {
Self::try_new(
ChooseRandomRealSubRegions::EXTERN_NAME.to_string(),
vec![
UnresolvedCallArgument::Identifier(value.destination_memory_region_name),
UnresolvedCallArgument::Identifier(value.source_memory_region_name),
UnresolvedCallArgument::Immediate(Complex64 {
re: value.sub_region_size,
im: 0.0,
}),
UnresolvedCallArgument::Identifier(value.seed_memory_region_name),
],
)
}
}
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "stubs", gen_stub_pyclass)]
#[pyo3::pyclass(module = "qcs_sdk.qpu.experimental.random", frozen)]
pub struct PrngSeedValue {
u64_value: u64,
f64_value: f64,
}
#[cfg_attr(not(feature = "python"), optipy::strip_pyo3)]
#[cfg_attr(feature = "stubs", gen_stub_pymethods)]
#[cfg_attr(feature = "python", pyo3::pymethods)]
impl PrngSeedValue {
#[new]
pub fn try_new(value: u64) -> RandomResult<Self> {
if !(1..=MAX_SEQUENCER_VALUE).contains(&value) {
return Err(Error::InvalidSeed(value));
}
if let Some(f64_value) = value.to_f64() {
Ok(Self {
u64_value: value,
f64_value,
})
} else {
Err(Error::InvalidSeed(value))
}
}
pub(super) fn as_f64(&self) -> f64 {
self.f64_value
}
}
fn lfsr_next(seed: u64, taps: &[u32]) -> u64 {
let feedback_value = taps.iter().fold(0, |acc, tap| {
let base = 2u64.pow(*tap);
let bit = u64::from((seed & base) != 0);
acc.bitxor(bit)
});
((seed << 1) & MAX_SEQUENCER_VALUE) | feedback_value
}
#[must_use]
#[cfg_attr(
feature = "stubs",
gen_stub_pyfunction(module = "qcs_sdk.qpu.experimental.random")
)]
#[pyo3::pyfunction]
pub fn lfsr_v1_next(seed: PrngSeedValue) -> u64 {
lfsr_next(seed.u64_value, &V1_TAPS)
}
fn generate_lfsr_v1_sequence(seed: u64, start_index: u32, series_length: u32) -> Vec<u64> {
let mut lfsr = seed & MAX_SEQUENCER_VALUE;
let range = start_index..(start_index + series_length);
let mut collection = vec![];
for i in 0..(start_index + series_length) {
lfsr = lfsr_next(lfsr, &V1_TAPS);
if range.contains(&i) {
collection.push(lfsr);
}
}
collection
}
fn prng_value_to_sub_region_index(value: u64, sub_region_count: u8) -> u8 {
((value & MAX_UNSIGNED_MULTIPLIER) % u64::from(sub_region_count))
.to_u8()
.expect("modulo u8 should always produce a valid value")
}
#[must_use]
#[cfg_attr(
feature = "stubs",
gen_stub_pyfunction(module = "qcs_sdk.qpu.experimental.random")
)]
#[pyo3::pyfunction]
pub fn choose_random_real_sub_region_indices(
seed: PrngSeedValue,
start_index: u32,
series_length: u32,
sub_region_count: u8,
) -> Vec<u8> {
generate_lfsr_v1_sequence(seed.u64_value, start_index, series_length)
.iter()
.map(|&value| prng_value_to_sub_region_index(value, sub_region_count))
.collect()
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, fs::File};
fn prng_sequences() -> HashMap<u32, Vec<(u64, u64)>> {
serde_json::de::from_reader(
File::open(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/prng_test_cases.json"
))
.unwrap(),
)
.unwrap()
}
#[test]
fn test_lfsr_v1_next() {
for (num_shots, sequences) in prng_sequences() {
for (seed, expected) in sequences {
let sequence = super::generate_lfsr_v1_sequence(seed, num_shots - 1, 1);
assert_eq!(sequence.len(), 1);
let end_of_sequence = sequence[0];
assert_eq!(
end_of_sequence, expected,
"seed={seed}, num_shots={num_shots}",
);
}
}
}
}