1use std::{convert::TryFrom, fmt, time::Duration};
5
6#[deny(clippy::module_name_repetitions)]
7pub use ::pbjson_types::Duration as QpuApiDuration;
8use async_trait::async_trait;
9use cached::proc_macro::cached;
10use derive_builder::Builder;
11use qcs_api_client_common::configuration::TokenError;
12#[cfg(feature = "grpc-web")]
13use qcs_api_client_grpc::tonic::wrap_channel_with_grpc_web;
14pub use qcs_api_client_grpc::tonic::Error as GrpcError;
15use qcs_api_client_grpc::{
16 get_channel_with_timeout,
17 models::controller::{
18 controller_job_execution_result, data_value::Value, ControllerJobExecutionResult,
19 DataValue, EncryptedControllerJob, JobExecutionConfiguration, RealDataValue,
20 },
21 services::controller::{
22 cancel_controller_jobs_request, controller_client::ControllerClient,
23 execute_controller_job_request, get_controller_job_results_request,
24 CancelControllerJobsRequest, ExecuteControllerJobRequest,
25 ExecutionOptions as InnerApiExecutionOptions, GetControllerJobResultsRequest,
26 },
27 tonic::{parse_uri, wrap_channel_with, wrap_channel_with_retry},
28};
29pub use qcs_api_client_openapi::apis::Error as OpenApiError;
30use qcs_api_client_openapi::apis::{
31 endpoints_api::{
32 get_default_endpoint as api_get_default_endpoint, get_endpoint, GetDefaultEndpointError,
33 GetEndpointError,
34 },
35 quantum_processors_api::{
36 list_quantum_processor_accessors, ListQuantumProcessorAccessorsError,
37 },
38};
39use qcs_api_client_openapi::models::QuantumProcessorAccessorType;
40
41use crate::executable::Parameters;
42
43use crate::client::{GrpcClientError, GrpcConnection, Qcs};
44
45const MAX_DECODING_MESSAGE_SIZE_BYTES: usize = 250 * 1024 * 1024;
47
48pub(crate) fn params_into_job_execution_configuration(
49 params: &Parameters,
50) -> JobExecutionConfiguration {
51 let memory_values = params
52 .iter()
53 .map(|(str, value)| {
54 (
55 str.as_ref().into(),
56 DataValue {
57 value: Some(Value::Real(RealDataValue {
58 data: value.clone(),
59 })),
60 },
61 )
62 })
63 .collect();
64
65 JobExecutionConfiguration { memory_values }
66}
67
68#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
70pub struct JobId(pub(crate) String);
71
72impl fmt::Display for JobId {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 <String as fmt::Display>::fmt(&self.0, f)
75 }
76}
77
78impl From<String> for JobId {
79 fn from(value: String) -> Self {
80 Self(value)
81 }
82}
83
84pub async fn submit(
100 quantum_processor_id: Option<&str>,
101 program: EncryptedControllerJob,
102 patch_values: &Parameters,
103 client: &Qcs,
104 execution_options: &ExecutionOptions,
105) -> Result<JobId, QpuApiError> {
106 submit_with_parameter_batch(
107 quantum_processor_id,
108 program,
109 std::iter::once(patch_values),
110 client,
111 execution_options,
112 )
113 .await?
114 .pop()
115 .ok_or_else(|| GrpcClientError::ResponseEmpty("Job Execution ID".into()))
116 .map_err(QpuApiError::from)
117}
118
119pub async fn submit_with_parameter_batch<'a, I>(
141 quantum_processor_id: Option<&str>,
142 program: EncryptedControllerJob,
143 patch_values: I,
144 client: &Qcs,
145 execution_options: &ExecutionOptions,
146) -> Result<Vec<JobId>, QpuApiError>
147where
148 I: IntoIterator<Item = &'a Parameters>,
149{
150 #[cfg(feature = "tracing")]
151 tracing::debug!(
152 "submitting job to {:?} using options {:?}",
153 quantum_processor_id,
154 execution_options
155 );
156
157 let mut patch_values = patch_values.into_iter().peekable();
158 if patch_values.peek().is_none() {
159 return Err(QpuApiError::EmptyPatchValues);
160 }
161
162 let request = ExecuteControllerJobRequest {
163 execution_configurations: patch_values
164 .map(params_into_job_execution_configuration)
165 .collect(),
166 job: Some(execute_controller_job_request::Job::Encrypted(program)),
167 target: execution_options.get_job_target(quantum_processor_id),
168 options: execution_options.api_options().copied(),
169 };
170
171 let mut controller_client = execution_options
172 .get_controller_client(client, quantum_processor_id)
173 .await?;
174
175 Ok(controller_client
176 .execute_controller_job(request)
177 .await
178 .map_err(GrpcClientError::RequestFailed)?
179 .into_inner()
180 .job_execution_ids
181 .into_iter()
182 .map(JobId)
183 .collect())
184}
185
186pub async fn cancel_jobs(
208 job_ids: Vec<JobId>,
209 quantum_processor_id: Option<&str>,
210 client: &Qcs,
211 execution_options: &ExecutionOptions,
212) -> Result<(), QpuApiError> {
213 let mut controller_client = execution_options
214 .get_controller_client(client, quantum_processor_id)
215 .await?;
216
217 let request = CancelControllerJobsRequest {
218 job_ids: job_ids.into_iter().map(|id| id.0).collect(),
219 target: execution_options.get_cancel_target(quantum_processor_id),
220 };
221
222 controller_client
223 .cancel_controller_jobs(request)
224 .await
225 .map_err(GrpcClientError::RequestFailed)?;
226
227 Ok(())
228}
229
230pub async fn cancel_job(
252 job_id: JobId,
253 quantum_processor_id: Option<&str>,
254 client: &Qcs,
255 execution_options: &ExecutionOptions,
256) -> Result<(), QpuApiError> {
257 cancel_jobs(
258 vec![job_id],
259 quantum_processor_id,
260 client,
261 execution_options,
262 )
263 .await
264}
265
266pub async fn retrieve_results(
278 job_id: JobId,
279 quantum_processor_id: Option<&str>,
280 client: &Qcs,
281 execution_options: &ExecutionOptions,
282) -> Result<ControllerJobExecutionResult, QpuApiError> {
283 #[cfg(feature = "tracing")]
284 tracing::debug!(
285 "retrieving job results for {} on {:?} using options {:?}",
286 job_id,
287 quantum_processor_id,
288 execution_options,
289 );
290
291 let request = GetControllerJobResultsRequest {
292 job_execution_id: job_id.0,
293 target: execution_options.get_results_target(quantum_processor_id),
294 };
295
296 let mut controller_client = execution_options
297 .get_controller_client(client, quantum_processor_id)
298 .await?;
299
300 controller_client
301 .get_controller_job_results(request)
302 .await
303 .map_err(GrpcClientError::RequestFailed)?
304 .into_inner()
305 .result
306 .ok_or_else(|| GrpcClientError::ResponseEmpty("Job Execution Results".into()))
307 .map_err(QpuApiError::from)
308 .and_then(
309 |result| match controller_job_execution_result::Status::try_from(result.status) {
310 Ok(controller_job_execution_result::Status::Success) => Ok(result),
311 Ok(status) => Err(QpuApiError::JobExecutionFailed {
312 status: status.as_str_name().to_string(),
313 message: result
314 .status_message
315 .unwrap_or("No message provided.".to_string()),
316 }),
317 Err(s) => Err(QpuApiError::InvalidJobStatus {
318 status: result.status,
319 message: s.to_string(),
320 }),
321 },
322 )
323}
324
325pub type QpuConnectionOptions = ExecutionOptions;
331pub type QpuConnectionOptionsBuilder = ExecutionOptionsBuilder;
333
334#[derive(Builder, Clone, Debug, PartialEq)]
339pub struct ExecutionOptions {
340 #[doc = "The [`ConnectionStrategy`] to use to establish a connection to the QPU."]
341 #[builder(default)]
342 connection_strategy: ConnectionStrategy,
343 #[doc = "The timeout to use for the request, defaults to 30 seconds. If set to `None`, then there is no timeout."]
344 #[builder(default = "Some(Duration::from_secs(30))")]
345 timeout: Option<Duration>,
346 #[doc = "Options available when executing a job on a QPU, particular to the execution service's API."]
347 #[builder(default = "None")]
348 api_options: Option<InnerApiExecutionOptions>,
349}
350
351impl Default for ExecutionOptions {
352 fn default() -> Self {
353 ExecutionOptionsBuilder::default().build().expect(
354 "Should be able to derive a default set of the ExecutionOptions from the builder.",
355 )
356 }
357}
358
359impl Eq for ExecutionOptions {}
360
361#[derive(Builder, Clone, Debug, Default, PartialEq)]
367#[allow(clippy::module_name_repetitions)]
368pub struct ApiExecutionOptions {
369 inner: InnerApiExecutionOptions,
371}
372
373impl Eq for ApiExecutionOptions {}
374
375impl ApiExecutionOptions {
376 #[must_use]
378 pub fn builder() -> ApiExecutionOptionsBuilder {
379 ApiExecutionOptionsBuilder::default()
380 }
381
382 #[must_use]
384 pub fn bypass_settings_protection(&self) -> bool {
385 self.inner.bypass_settings_protection
386 }
387
388 #[must_use]
399 pub fn timeout(&self) -> Option<::pbjson_types::Duration> {
400 self.inner.timeout
401 }
402}
403
404impl From<ApiExecutionOptions> for InnerApiExecutionOptions {
405 fn from(options: ApiExecutionOptions) -> Self {
406 options.inner
407 }
408}
409
410impl From<InnerApiExecutionOptions> for ApiExecutionOptions {
411 fn from(inner: InnerApiExecutionOptions) -> Self {
412 Self { inner }
413 }
414}
415
416impl ApiExecutionOptionsBuilder {
417 pub fn bypass_settings_protection(&mut self, bypass_settings_protection: bool) -> &mut Self {
419 self.inner
420 .get_or_insert(InnerApiExecutionOptions::default())
421 .bypass_settings_protection = bypass_settings_protection;
422 self
423 }
424
425 pub fn timeout(&mut self, timeout: Option<::pbjson_types::Duration>) -> &mut Self {
427 self.inner
428 .get_or_insert(InnerApiExecutionOptions::default())
429 .timeout = timeout;
430 self
431 }
432}
433
434impl ExecutionOptions {
435 #[must_use]
437 pub fn builder() -> ExecutionOptionsBuilder {
438 ExecutionOptionsBuilder::default()
439 }
440
441 #[must_use]
443 pub fn connection_strategy(&self) -> &ConnectionStrategy {
444 &self.connection_strategy
445 }
446
447 #[must_use]
449 pub fn timeout(&self) -> Option<Duration> {
450 self.timeout
451 }
452
453 #[must_use]
455 pub fn api_options(&self) -> Option<&InnerApiExecutionOptions> {
456 self.api_options.as_ref()
457 }
458}
459
460#[derive(Clone, Debug, Default, PartialEq, Eq)]
462pub enum ConnectionStrategy {
463 #[default]
465 Gateway,
466 DirectAccess,
469 EndpointId(String),
471}
472
473#[async_trait]
479pub trait ExecutionTarget<'a> {
480 fn connection_strategy(&'a self) -> &'a ConnectionStrategy;
482 fn timeout(&self) -> Option<Duration>;
484
485 fn get_job_target(
487 &'a self,
488 quantum_processor_id: Option<&str>,
489 ) -> Option<execute_controller_job_request::Target> {
490 match self.connection_strategy() {
491 ConnectionStrategy::EndpointId(endpoint_id) => Some(
492 execute_controller_job_request::Target::EndpointId(endpoint_id.to_string()),
493 ),
494 ConnectionStrategy::Gateway | ConnectionStrategy::DirectAccess => quantum_processor_id
495 .map(String::from)
496 .map(execute_controller_job_request::Target::QuantumProcessorId),
497 }
498 }
499
500 fn get_results_target(
502 &'a self,
503 quantum_processor_id: Option<&str>,
504 ) -> Option<get_controller_job_results_request::Target> {
505 match self.connection_strategy() {
506 ConnectionStrategy::EndpointId(endpoint_id) => Some(
507 get_controller_job_results_request::Target::EndpointId(endpoint_id.to_string()),
508 ),
509 ConnectionStrategy::Gateway | ConnectionStrategy::DirectAccess => quantum_processor_id
510 .map(String::from)
511 .map(get_controller_job_results_request::Target::QuantumProcessorId),
512 }
513 }
514
515 fn get_cancel_target(
517 &'a self,
518 quantum_processor_id: Option<&str>,
519 ) -> Option<cancel_controller_jobs_request::Target> {
520 match self.connection_strategy() {
521 ConnectionStrategy::EndpointId(endpoint_id) => Some(
522 cancel_controller_jobs_request::Target::EndpointId(endpoint_id.to_string()),
523 ),
524 ConnectionStrategy::Gateway | ConnectionStrategy::DirectAccess => quantum_processor_id
525 .map(String::from)
526 .map(cancel_controller_jobs_request::Target::QuantumProcessorId),
527 }
528 }
529
530 async fn get_controller_client(
532 &'a self,
533 client: &Qcs,
534 quantum_processor_id: Option<&str>,
535 ) -> Result<ControllerClient<GrpcConnection>, QpuApiError> {
536 let service = self
537 .get_qpu_grpc_connection(client, quantum_processor_id)
538 .await?;
539 Ok(ControllerClient::new(service)
540 .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE_BYTES))
541 }
542
543 async fn get_qpu_grpc_connection(
545 &'a self,
546 client: &Qcs,
547 quantum_processor_id: Option<&str>,
548 ) -> Result<GrpcConnection, QpuApiError> {
549 let address = match self.connection_strategy() {
550 ConnectionStrategy::EndpointId(endpoint_id) => {
551 let endpoint = get_endpoint(&client.get_openapi_client(), endpoint_id).await?;
552 endpoint
553 .addresses
554 .grpc
555 .ok_or_else(|| QpuApiError::EndpointNotFound(endpoint_id.into()))?
556 }
557 ConnectionStrategy::Gateway => {
558 self.get_gateway_address(
559 quantum_processor_id.ok_or(QpuApiError::MissingQpuId)?,
560 client,
561 )
562 .await?
563 }
564 ConnectionStrategy::DirectAccess => {
565 self.get_default_endpoint_address(
566 quantum_processor_id.ok_or(QpuApiError::MissingQpuId)?,
567 client,
568 )
569 .await?
570 }
571 };
572 self.grpc_address_to_channel(&address, client)
573 }
574
575 fn grpc_address_to_channel(
577 &self,
578 address: &str,
579 client: &Qcs,
580 ) -> Result<GrpcConnection, QpuApiError> {
581 let uri = parse_uri(address).map_err(QpuApiError::GrpcError)?;
582 let channel = get_channel_with_timeout(uri, self.timeout())
583 .map_err(|err| QpuApiError::GrpcError(err.into()))?;
584 let channel =
585 wrap_channel_with_retry(wrap_channel_with(channel, client.get_config().clone()));
586 #[cfg(feature = "grpc-web")]
587 let channel = wrap_channel_with_grpc_web(channel);
588 Ok(channel)
589 }
590
591 async fn get_gateway_address(
593 &self,
594 quantum_processor_id: &str,
595 client: &Qcs,
596 ) -> Result<String, QpuApiError> {
597 get_accessor_with_cache(quantum_processor_id, client).await
598 }
599
600 async fn get_default_endpoint_address(
602 &self,
603 quantum_processor_id: &str,
604 client: &Qcs,
605 ) -> Result<String, QpuApiError> {
606 get_default_endpoint_with_cache(quantum_processor_id, client).await
607 }
608}
609
610#[async_trait]
613impl<'a> ExecutionTarget<'a> for ExecutionOptions {
614 fn connection_strategy(&'a self) -> &'a ConnectionStrategy {
615 self.connection_strategy()
616 }
617
618 fn timeout(&self) -> Option<Duration> {
619 self.timeout()
620 }
621}
622
623#[cached(
624 result = true,
625 time = 60,
626 time_refresh = true,
627 sync_writes = true,
628 key = "String",
629 convert = r"{ String::from(quantum_processor_id)}"
630)]
631async fn get_accessor_with_cache(
632 quantum_processor_id: &str,
633 client: &Qcs,
634) -> Result<String, QpuApiError> {
635 #[cfg(feature = "tracing")]
636 tracing::info!(quantum_processor_id=%quantum_processor_id, "get_accessor cache miss");
637 get_accessor(quantum_processor_id, client).await
638}
639
640async fn get_accessor(quantum_processor_id: &str, client: &Qcs) -> Result<String, QpuApiError> {
641 let mut min = None;
642 let mut next_page_token = None;
643 loop {
644 let accessors = list_quantum_processor_accessors(
645 &client.get_openapi_client(),
646 quantum_processor_id,
647 Some(100),
648 next_page_token.as_deref(),
649 )
650 .await?;
651
652 let accessor = accessors
653 .accessors
654 .into_iter()
655 .filter(|acc| {
656 acc.live
657 && acc.access_type.as_deref() == Some(&QuantumProcessorAccessorType::GatewayV1)
659 })
660 .min_by_key(|acc| acc.rank.unwrap_or(i64::MAX));
661
662 min = std::cmp::min_by_key(min, accessor, |acc| {
663 acc.as_ref().and_then(|acc| acc.rank).unwrap_or(i64::MAX)
664 });
665
666 next_page_token.clone_from(&accessors.next_page_token);
667 if next_page_token.is_none() {
668 break;
669 }
670 }
671 min.map(|accessor| accessor.url)
672 .ok_or_else(|| QpuApiError::GatewayNotFound(quantum_processor_id.to_string()))
673}
674
675#[cached(
676 result = true,
677 time = 60,
678 time_refresh = true,
679 sync_writes = true,
680 key = "String",
681 convert = r"{ String::from(quantum_processor_id)}"
682)]
683async fn get_default_endpoint_with_cache(
684 quantum_processor_id: &str,
685 client: &Qcs,
686) -> Result<String, QpuApiError> {
687 #[cfg(feature = "tracing")]
688 tracing::info!(quantum_processor_id=%quantum_processor_id, "get_default_endpoint cache miss");
689 get_default_endpoint(quantum_processor_id, client).await
690}
691
692async fn get_default_endpoint(
693 quantum_processor_id: &str,
694 client: &Qcs,
695) -> Result<String, QpuApiError> {
696 let default_endpoint =
697 api_get_default_endpoint(&client.get_openapi_client(), quantum_processor_id).await?;
698 let addresses = default_endpoint.addresses.as_ref();
699 let grpc_address = addresses.grpc.as_ref();
700 grpc_address
701 .ok_or_else(|| QpuApiError::QpuEndpointNotFound(quantum_processor_id.into()))
702 .cloned()
703}
704
705#[derive(Debug, thiserror::Error)]
707pub enum QpuApiError {
708 #[error("Error configuring gRPC request: {0}")]
710 GrpcError(#[from] GrpcError<TokenError>),
711
712 #[error("Missing gRPC endpoint for endpoint ID: {0}")]
714 EndpointNotFound(String),
715
716 #[error("Missing gRPC endpoint for quantum processor: {0}")]
718 QpuEndpointNotFound(String),
719
720 #[error("Failed to get endpoint for quantum processor: {0}")]
722 QpuEndpointRequestFailed(#[from] OpenApiError<GetDefaultEndpointError>),
723
724 #[error("Failed to get accessors for quantum processor: {0}")]
726 AccessorRequestFailed(#[from] OpenApiError<ListQuantumProcessorAccessorsError>),
727
728 #[error("No gateway found for quantum processor: {0}")]
730 GatewayNotFound(String),
731
732 #[error("Failed to get endpoint for the given ID: {0}")]
734 EndpointRequestFailed(#[from] OpenApiError<GetEndpointError>),
735
736 #[error(transparent)]
738 GrpcClientError(#[from] GrpcClientError),
739
740 #[error("A quantum processor ID must be provided if not connecting directly to an endpoint ID with ConnectionStrategy::EndpointId")]
742 MissingQpuId,
743
744 #[error("Submitting a job requires at least one set of patch values")]
746 EmptyPatchValues,
747
748 #[error("The submitted job failed with status: {status}. {message}")]
750 JobExecutionFailed {
751 status: String,
753 message: String,
755 },
756 #[error("The status code could not be decoded: {0}")]
758 StatusCodeDecode(String),
759 #[error("The request returned an invalid status: {status}. {message}")]
763 InvalidJobStatus {
764 status: i32,
766 message: String,
769 },
770}
771
772#[cfg(test)]
773mod test {
774 use crate::qpu::api::ExecutionOptions;
775
776 use super::ExecutionOptionsBuilder;
777
778 #[test]
779 fn test_default_execution_options() {
780 assert_eq!(
781 ExecutionOptions::default(),
782 ExecutionOptionsBuilder::default().build().unwrap(),
783 );
784 }
785}