use std::{convert::TryFrom, fmt, time::Duration};
use tonic::codec::CompressionEncoding;
#[cfg(feature = "stubs")]
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen_stub_pymethods};
#[deny(clippy::module_name_repetitions)]
pub use ::pbjson_types::Duration as QpuApiDuration;
use async_trait::async_trait;
use cached::proc_macro::cached;
use derive_builder::Builder;
use qcs_api_client_common::configuration::TokenError;
#[cfg(feature = "grpc-web")]
use qcs_api_client_grpc::tonic::wrap_channel_with_grpc_web;
#[cfg(feature = "tracing")]
use qcs_api_client_grpc::tonic::wrap_channel_with_tracing;
pub use qcs_api_client_grpc::tonic::Error as GrpcError;
use qcs_api_client_grpc::{
get_channel_with_timeout,
models::controller::{
controller_job_execution_result, data_value::Value, ControllerJobExecutionResult,
DataValue, EncryptedControllerJob, JobExecutionConfiguration, RealDataValue,
},
services::controller::{
cancel_controller_jobs_request, controller_client::ControllerClient,
execute_controller_job_request, get_controller_job_results_request,
CancelControllerJobsRequest, ExecuteControllerJobRequest,
ExecutionOptions as InnerApiExecutionOptions, GetControllerJobResultsRequest,
},
tonic::{parse_uri, wrap_channel_with, wrap_channel_with_retry},
};
pub use qcs_api_client_openapi::apis::Error as OpenApiError;
use qcs_api_client_openapi::models::QuantumProcessorAccessorType;
use qcs_api_client_openapi::{
apis::{
endpoints_api::{
get_default_endpoint as api_get_default_endpoint, get_endpoint,
GetDefaultEndpointError, GetEndpointError,
},
quantum_processors_api::{
get_quantum_processor_accessors, GetQuantumProcessorAccessorsError,
},
},
models::QuantumProcessorAccessor,
};
use crate::executable::Parameters;
use crate::client::{GrpcClientError, GrpcConnection, Qcs};
#[cfg(feature = "python")]
pub mod python;
const MAX_CONTROLLER_OUTBOUND_REQUEST_SIZE: usize = 250 * 1024 * 1024;
pub(crate) fn params_into_job_execution_configuration(
params: &Parameters,
) -> JobExecutionConfiguration {
let memory_values = params
.iter()
.map(|(str, value)| {
(
str.as_ref().into(),
DataValue {
value: Some(Value::Real(RealDataValue {
data: value.clone(),
})),
},
)
})
.collect();
JobExecutionConfiguration { memory_values }
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "python", derive(pyo3::FromPyObject, pyo3::IntoPyObject))]
pub struct JobId(pub(crate) String);
impl fmt::Display for JobId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<String as fmt::Display>::fmt(&self.0, f)
}
}
impl From<String> for JobId {
fn from(value: String) -> Self {
Self(value)
}
}
#[expect(clippy::missing_errors_doc)]
pub async fn submit(
quantum_processor_id: Option<&str>,
program: EncryptedControllerJob,
patch_values: &Parameters,
client: &Qcs,
execution_options: &ExecutionOptions,
) -> Result<JobId, QpuApiError> {
submit_with_parameter_batch(
quantum_processor_id,
program,
std::iter::once(patch_values),
client,
execution_options,
)
.await?
.pop()
.ok_or_else(|| GrpcClientError::ResponseEmpty("Job Execution ID".into()))
.map_err(QpuApiError::from)
}
pub async fn submit_with_parameter_batch<'a, I>(
quantum_processor_id: Option<&str>,
program: EncryptedControllerJob,
patch_values: I,
client: &Qcs,
execution_options: &ExecutionOptions,
) -> Result<Vec<JobId>, QpuApiError>
where
I: IntoIterator<Item = &'a Parameters>,
{
#[cfg(feature = "tracing")]
tracing::debug!(
"submitting job to {:?} using options {:?}",
quantum_processor_id,
execution_options
);
let mut patch_values = patch_values.into_iter().peekable();
if patch_values.peek().is_none() {
return Err(QpuApiError::EmptyPatchValues);
}
let request = ExecuteControllerJobRequest {
execution_configurations: patch_values
.map(params_into_job_execution_configuration)
.collect(),
job: Some(execute_controller_job_request::Job::Encrypted(program)),
target: execution_options.get_job_target(quantum_processor_id),
options: execution_options.api_options().copied(),
};
let mut controller_client = execution_options
.get_controller_client(client, quantum_processor_id)
.await?;
Ok(controller_client
.execute_controller_job(request)
.await
.map_err(GrpcClientError::RequestFailed)?
.into_inner()
.job_execution_ids
.into_iter()
.map(JobId)
.collect())
}
pub async fn cancel_jobs(
job_ids: Vec<JobId>,
quantum_processor_id: Option<&str>,
client: &Qcs,
execution_options: &ExecutionOptions,
) -> Result<(), QpuApiError> {
let mut controller_client = execution_options
.get_controller_client(client, quantum_processor_id)
.await?;
let request = CancelControllerJobsRequest {
job_ids: job_ids.into_iter().map(|id| id.0).collect(),
target: execution_options.get_cancel_target(quantum_processor_id),
};
controller_client
.cancel_controller_jobs(request)
.await
.map_err(GrpcClientError::RequestFailed)?;
Ok(())
}
pub async fn cancel_job(
job_id: JobId,
quantum_processor_id: Option<&str>,
client: &Qcs,
execution_options: &ExecutionOptions,
) -> Result<(), QpuApiError> {
cancel_jobs(
vec![job_id],
quantum_processor_id,
client,
execution_options,
)
.await
}
#[expect(clippy::missing_errors_doc)]
pub async fn retrieve_results(
job_id: JobId,
quantum_processor_id: Option<&str>,
client: &Qcs,
execution_options: &ExecutionOptions,
) -> Result<ControllerJobExecutionResult, QpuApiError> {
#[cfg(feature = "tracing")]
tracing::debug!(
"retrieving job results for {} on {:?} using options {:?}",
job_id,
quantum_processor_id,
execution_options,
);
let request = GetControllerJobResultsRequest {
job_execution_id: job_id.0,
target: execution_options.get_results_target(quantum_processor_id),
};
let mut controller_client = execution_options
.get_controller_client(client, quantum_processor_id)
.await?;
controller_client
.get_controller_job_results(request)
.await
.map_err(GrpcClientError::RequestFailed)?
.into_inner()
.result
.ok_or_else(|| GrpcClientError::ResponseEmpty("Job Execution Results".into()))
.map_err(QpuApiError::from)
.and_then(
|result| match controller_job_execution_result::Status::try_from(result.status) {
Ok(controller_job_execution_result::Status::Success) => Ok(result),
Ok(status) => Err(QpuApiError::JobExecutionFailed {
status: status.as_str_name().to_string(),
message: result
.status_message
.unwrap_or("No message provided.".to_string()),
}),
Err(s) => Err(QpuApiError::InvalidJobStatus {
status: result.status,
message: s.to_string(),
}),
},
)
}
pub type QpuConnectionOptions = ExecutionOptions;
pub type QpuConnectionOptionsBuilder = ExecutionOptionsBuilder;
#[derive(Builder, Clone, Debug, PartialEq)]
#[cfg_attr(
not(feature = "stubs"),
builder_struct_attr(optipy::strip_pyo3(only_stubs)),
builder_field_attr(stub_gen(skip)),
optipy::strip_pyo3(only_stubs)
)]
#[cfg_attr(
not(feature = "python"),
builder_struct_attr(optipy::strip_pyo3),
optipy::strip_pyo3
)]
#[cfg_attr(
feature = "stubs",
builder_struct_attr(gen_stub_pyclass),
gen_stub_pyclass
)]
#[cfg_attr(
feature = "python",
builder_struct_attr(pyo3::pyclass(module = "qcs_sdk.qpu.api")),
pyo3::pyclass(module = "qcs_sdk.qpu.api", eq)
)]
pub struct ExecutionOptions {
#[pyo3(get)]
#[doc = "The [`ConnectionStrategy`] to use to establish a connection to the QPU."]
#[builder(default)]
connection_strategy: ConnectionStrategy,
#[doc = "The timeout to use for the request, defaults to 30 seconds. If set to `None`, then there is no timeout."]
#[builder(default = "Some(Duration::from_secs(30))")]
timeout: Option<Duration>,
#[doc = "Options available when executing a job on a QPU, particular to the execution service's API."]
#[builder(default = "None")]
api_options: Option<InnerApiExecutionOptions>,
}
impl Default for ExecutionOptions {
fn default() -> Self {
ExecutionOptionsBuilder::default().build().expect(
"Should be able to derive a default set of the ExecutionOptions from the builder.",
)
}
}
impl Eq for ExecutionOptions {}
#[derive(Builder, Clone, Copy, Debug, Default, PartialEq)]
#[cfg_attr(
feature = "stubs",
builder_struct_attr(gen_stub_pyclass),
gen_stub_pyclass
)]
#[cfg_attr(
feature = "python",
builder_struct_attr(pyo3::pyclass(
name = "APIExecutionOptionsBuilder",
module = "qcs_sdk.qpu.api"
)),
pyo3::pyclass(name = "APIExecutionOptions", module = "qcs_sdk.qpu.api")
)]
#[allow(clippy::module_name_repetitions)]
pub struct ApiExecutionOptions {
inner: InnerApiExecutionOptions,
}
impl Eq for ApiExecutionOptions {}
#[cfg_attr(not(feature = "python"), optipy::strip_pyo3)]
#[cfg_attr(feature = "stubs", gen_stub_pymethods)]
#[cfg_attr(feature = "python", pyo3::pymethods)]
impl ApiExecutionOptions {
#[staticmethod]
#[must_use]
pub fn builder() -> ApiExecutionOptionsBuilder {
ApiExecutionOptionsBuilder::default()
}
#[getter]
#[must_use]
pub fn bypass_settings_protection(&self) -> bool {
self.inner.bypass_settings_protection
}
}
impl ApiExecutionOptions {
#[must_use]
pub fn timeout(&self) -> Option<::pbjson_types::Duration> {
self.inner.timeout
}
}
impl From<ApiExecutionOptions> for InnerApiExecutionOptions {
fn from(options: ApiExecutionOptions) -> Self {
options.inner
}
}
impl From<InnerApiExecutionOptions> for ApiExecutionOptions {
fn from(inner: InnerApiExecutionOptions) -> Self {
Self { inner }
}
}
impl ApiExecutionOptionsBuilder {
pub fn bypass_settings_protection(&mut self, bypass_settings_protection: bool) -> &mut Self {
self.inner
.get_or_insert(InnerApiExecutionOptions::default())
.bypass_settings_protection = bypass_settings_protection;
self
}
pub fn timeout(&mut self, timeout: Option<::pbjson_types::Duration>) -> &mut Self {
self.inner
.get_or_insert(InnerApiExecutionOptions::default())
.timeout = timeout;
self
}
}
impl ExecutionOptions {
#[must_use]
pub fn builder() -> ExecutionOptionsBuilder {
ExecutionOptionsBuilder::default()
}
#[must_use]
pub fn connection_strategy(&self) -> &ConnectionStrategy {
&self.connection_strategy
}
#[must_use]
pub fn timeout(&self) -> Option<Duration> {
self.timeout
}
#[must_use]
pub fn api_options(&self) -> Option<&InnerApiExecutionOptions> {
self.api_options.as_ref()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "stubs", gen_stub_pyclass_complex_enum)]
#[cfg_attr(feature = "python", pyo3::pyclass(module = "qcs_sdk.qpu.api", eq))]
pub enum ConnectionStrategy {
Gateway(),
DirectAccess(),
EndpointId(String),
EndpointAddress(String),
}
impl Default for ConnectionStrategy {
fn default() -> Self {
Self::Gateway()
}
}
#[async_trait]
pub trait ExecutionTarget<'a> {
fn connection_strategy(&'a self) -> &'a ConnectionStrategy;
fn timeout(&self) -> Option<Duration>;
fn get_job_target(
&'a self,
quantum_processor_id: Option<&str>,
) -> Option<execute_controller_job_request::Target> {
match self.connection_strategy() {
ConnectionStrategy::EndpointId(endpoint_id) => Some(
execute_controller_job_request::Target::EndpointId(endpoint_id.clone()),
),
ConnectionStrategy::Gateway()
| ConnectionStrategy::DirectAccess()
| ConnectionStrategy::EndpointAddress(_) => quantum_processor_id
.map(String::from)
.map(execute_controller_job_request::Target::QuantumProcessorId),
}
}
fn get_results_target(
&'a self,
quantum_processor_id: Option<&str>,
) -> Option<get_controller_job_results_request::Target> {
match self.connection_strategy() {
ConnectionStrategy::EndpointId(endpoint_id) => Some(
get_controller_job_results_request::Target::EndpointId(endpoint_id.clone()),
),
ConnectionStrategy::Gateway()
| ConnectionStrategy::DirectAccess()
| ConnectionStrategy::EndpointAddress(_) => quantum_processor_id
.map(String::from)
.map(get_controller_job_results_request::Target::QuantumProcessorId),
}
}
fn get_cancel_target(
&'a self,
quantum_processor_id: Option<&str>,
) -> Option<cancel_controller_jobs_request::Target> {
match self.connection_strategy() {
ConnectionStrategy::EndpointId(endpoint_id) => Some(
cancel_controller_jobs_request::Target::EndpointId(endpoint_id.clone()),
),
ConnectionStrategy::Gateway()
| ConnectionStrategy::DirectAccess()
| ConnectionStrategy::EndpointAddress(_) => quantum_processor_id
.map(String::from)
.map(cancel_controller_jobs_request::Target::QuantumProcessorId),
}
}
async fn get_controller_client(
&'a self,
client: &Qcs,
quantum_processor_id: Option<&str>,
) -> Result<ControllerClient<GrpcConnection>, QpuApiError> {
let service = self
.get_qpu_grpc_connection(client, quantum_processor_id)
.await?;
Ok(ControllerClient::new(service)
.max_encoding_message_size(MAX_CONTROLLER_OUTBOUND_REQUEST_SIZE)
.max_decoding_message_size(u32::MAX as usize)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip))
}
async fn get_qpu_grpc_connection(
&'a self,
client: &Qcs,
quantum_processor_id: Option<&str>,
) -> Result<GrpcConnection, QpuApiError> {
let address = match self.connection_strategy() {
ConnectionStrategy::EndpointId(endpoint_id) => {
let endpoint = get_endpoint(&client.get_openapi_client(), endpoint_id).await?;
endpoint
.addresses
.grpc
.ok_or_else(|| QpuApiError::EndpointNotFound(endpoint_id.into()))?
}
ConnectionStrategy::Gateway() => {
self.get_gateway_address(
quantum_processor_id.ok_or(QpuApiError::MissingQpuId)?,
client,
)
.await?
}
ConnectionStrategy::DirectAccess() => {
self.get_default_endpoint_address(
quantum_processor_id.ok_or(QpuApiError::MissingQpuId)?,
client,
)
.await?
}
ConnectionStrategy::EndpointAddress(address) => address.clone(),
};
self.grpc_address_to_channel(&address, client)
}
#[expect(clippy::missing_errors_doc, clippy::result_large_err)]
fn grpc_address_to_channel(
&self,
address: &str,
client: &Qcs,
) -> Result<GrpcConnection, QpuApiError> {
let uri = parse_uri(address).map_err(QpuApiError::GrpcError)?;
let channel = get_channel_with_timeout(uri, self.timeout())
.map_err(|err| QpuApiError::GrpcError(err.into()))?;
#[cfg(feature = "tracing")]
let channel = wrap_channel_with_tracing(
channel,
address.to_string(),
client
.get_config()
.tracing_configuration()
.cloned()
.unwrap_or_default(),
);
let channel = wrap_channel_with(channel, client.get_config().clone());
let channel = wrap_channel_with_retry(channel);
#[cfg(feature = "grpc-web")]
let channel = wrap_channel_with_grpc_web(channel);
Ok(channel)
}
async fn get_gateway_address(
&self,
quantum_processor_id: &str,
client: &Qcs,
) -> Result<String, QpuApiError> {
get_accessor_with_cache(quantum_processor_id, client).await
}
async fn get_default_endpoint_address(
&self,
quantum_processor_id: &str,
client: &Qcs,
) -> Result<String, QpuApiError> {
get_default_endpoint_with_cache(quantum_processor_id, client).await
}
}
#[async_trait]
impl<'a> ExecutionTarget<'a> for ExecutionOptions {
fn connection_strategy(&'a self) -> &'a ConnectionStrategy {
self.connection_strategy()
}
fn timeout(&self) -> Option<Duration> {
self.timeout()
}
}
#[cached(
result = true,
time = 60,
time_refresh = true,
sync_writes = true,
key = "String",
convert = r"{ String::from(quantum_processor_id)}"
)]
async fn get_accessor_with_cache(
quantum_processor_id: &str,
client: &Qcs,
) -> Result<String, QpuApiError> {
#[cfg(feature = "tracing")]
tracing::info!(quantum_processor_id=%quantum_processor_id, "get_accessor cache miss");
get_accessor(quantum_processor_id, client).await
}
async fn get_accessor(quantum_processor_id: &str, client: &Qcs) -> Result<String, QpuApiError> {
let accessors =
get_quantum_processor_accessors(&client.get_openapi_client(), quantum_processor_id).await?;
let min = select_min_accessor(accessors.accessors);
min.map(|accessor| accessor.url)
.ok_or_else(|| QpuApiError::GatewayNotFound(quantum_processor_id.to_string()))
}
fn select_min_accessor(
accessors: Vec<QuantumProcessorAccessor>,
) -> Option<QuantumProcessorAccessor> {
accessors
.into_iter()
.filter(|accessor| accessor.live)
.filter(|accessor| accessor.access_type == QuantumProcessorAccessorType::GatewayV1)
.min_by_key(|accessor| accessor.rank.unwrap_or(i64::MAX))
}
#[cached(
result = true,
time = 60,
time_refresh = true,
sync_writes = true,
key = "String",
convert = r"{ String::from(quantum_processor_id)}"
)]
async fn get_default_endpoint_with_cache(
quantum_processor_id: &str,
client: &Qcs,
) -> Result<String, QpuApiError> {
#[cfg(feature = "tracing")]
tracing::info!(quantum_processor_id=%quantum_processor_id, "get_default_endpoint cache miss");
get_default_endpoint(quantum_processor_id, client).await
}
async fn get_default_endpoint(
quantum_processor_id: &str,
client: &Qcs,
) -> Result<String, QpuApiError> {
let default_endpoint =
api_get_default_endpoint(&client.get_openapi_client(), quantum_processor_id).await?;
default_endpoint
.addresses
.grpc
.ok_or_else(|| QpuApiError::QpuEndpointNotFound(quantum_processor_id.into()))
}
#[derive(Debug, thiserror::Error)]
pub enum QpuApiError {
#[error("Error configuring gRPC request: {0}")]
GrpcError(#[from] GrpcError<TokenError>),
#[error("Missing gRPC endpoint for endpoint ID: {0}")]
EndpointNotFound(String),
#[error("Missing gRPC endpoint for quantum processor: {0}")]
QpuEndpointNotFound(String),
#[error("Failed to get endpoint for quantum processor: {0}")]
QpuEndpointRequestFailed(#[from] OpenApiError<GetDefaultEndpointError>),
#[error("Failed to get accessors for quantum processor: {0}")]
AccessorRequestFailed(#[from] OpenApiError<GetQuantumProcessorAccessorsError>),
#[error("No gateway found for quantum processor: {0}")]
GatewayNotFound(String),
#[error("Failed to get endpoint for the given ID: {0}")]
EndpointRequestFailed(#[from] OpenApiError<GetEndpointError>),
#[error(transparent)]
GrpcClientError(#[from] GrpcClientError),
#[error("A quantum processor ID must be provided if not connecting directly to an endpoint ID with ConnectionStrategy::EndpointId or ConnectionStrategy::EndpointAddress")]
MissingQpuId,
#[error("Submitting a job requires at least one set of patch values")]
EmptyPatchValues,
#[error("The submitted job failed with status: {status}. {message}")]
JobExecutionFailed {
status: String,
message: String,
},
#[error("The status code could not be decoded: {0}")]
StatusCodeDecode(String),
#[error("The request returned an invalid status: {status}. {message}")]
InvalidJobStatus {
status: i32,
message: String,
},
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_default_execution_options() {
assert_eq!(
ExecutionOptions::default(),
ExecutionOptionsBuilder::default().build().unwrap(),
);
}
#[test]
fn test_select_min_accessor_prefers_some_to_none() {
let expected = QuantumProcessorAccessor {
live: true,
access_type: QuantumProcessorAccessorType::GatewayV1,
rank: None,
url: "url".to_string(),
};
let accessors = vec![expected.clone()];
let actual = select_min_accessor(accessors);
assert_eq!(expected, actual.expect("expected Some accessor"));
}
}