gcloud_longrunning/
longrunning.rs1use std::marker::PhantomData;
2
3use google_cloud_gax::grpc::{Code, Status};
4use google_cloud_gax::retry::{invoke_fn, RetrySetting};
5use google_cloud_googleapis::longrunning::{
6 operation, CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation as InternalOperation,
7};
8
9use crate::autogen::operations_client::{default_retry_setting, OperationsClient};
10
11pub struct Operation<T: prost::Message + Default> {
12 inner: InternalOperation,
13 client: OperationsClient,
14 _marker: PhantomData<T>,
15}
16
17impl<T: prost::Message + Default> Operation<T> {
18 pub fn new(client: OperationsClient, inner: InternalOperation) -> Self {
19 Self {
20 client,
21 inner,
22 _marker: PhantomData,
23 }
24 }
25
26 pub fn name(&self) -> &str {
30 &self.inner.name
31 }
32
33 pub fn done(&self) -> bool {
35 self.inner.done
36 }
37
38 pub async fn poll(&mut self) -> Result<Option<T>, Status> {
47 if !self.done() {
48 let req = GetOperationRequest {
49 name: self.name().to_string(),
50 };
51 let operation = self.client.get_operation(req, None).await?;
52 self.inner = operation.into_inner()
53 }
54 if !self.done() {
55 return Ok(None);
56 }
57 let operation_result = self.inner.result.clone().unwrap();
58 match operation_result {
59 operation::Result::Response(message) => {
60 let decoded = T::decode(message.value.as_slice()).unwrap();
62 Ok(Some(decoded))
63 }
64 operation::Result::Error(status) => {
65 let tonic_code = tonic::Code::from(status.code);
66 Err(tonic::Status::new(tonic_code, status.message))
67 }
68 }
69 }
70
71 pub async fn wait(&mut self, option: Option<RetrySetting>) -> Result<Option<T>, Status> {
73 let settings = match option {
74 Some(s) => s,
75 None => {
76 let mut setting = default_retry_setting();
77 setting.codes = vec![Code::DeadlineExceeded];
78 setting
79 }
80 };
81 invoke_fn(
82 Some(settings),
83 |me| async {
84 let poll_result: Option<T> = match me.poll().await {
85 Ok(s) => s,
86 Err(e) => return Err((e, me)),
87 };
88 if me.done() {
89 Ok(poll_result)
90 } else {
91 Err((tonic::Status::new(tonic::Code::DeadlineExceeded, "wait timeout"), me))
92 }
93 },
94 self,
95 )
96 .await
97 }
98
99 pub async fn cancel(&mut self) -> Result<(), Status> {
108 let req = CancelOperationRequest {
109 name: self.name().to_string(),
110 };
111 self.client.cancel_operation(req, None).await.map(|_x| ())
112 }
113
114 pub async fn delete(&mut self) -> Result<(), Status> {
118 let req = DeleteOperationRequest {
119 name: self.name().to_string(),
120 };
121 self.client.delete_operation(req, None).await.map(|_x| ())
122 }
123}