gcloud_longrunning/
longrunning.rs

1use std::marker::PhantomData;
2
3use google_cloud_gax::grpc::{Code, Status};
4use google_cloud_gax::retry::{invoke_fn, RetrySetting};
5use google_cloud_googleapis::longrunning::{
6    operation, CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation as InternalOperation,
7};
8
9use crate::autogen::operations_client::{default_retry_setting, OperationsClient};
10
11pub struct Operation<T: prost::Message + Default> {
12    inner: InternalOperation,
13    client: OperationsClient,
14    _marker: PhantomData<T>,
15}
16
17impl<T: prost::Message + Default> Operation<T> {
18    pub fn new(client: OperationsClient, inner: InternalOperation) -> Self {
19        Self {
20            client,
21            inner,
22            _marker: PhantomData,
23        }
24    }
25
26    /// Name returns the name of the long-running operation.
27    /// The name is assigned by the server and is unique within the service
28    /// from which the operation is created.
29    pub fn name(&self) -> &str {
30        &self.inner.name
31    }
32
33    /// Done reports whether the long-running operation has completed.
34    pub fn done(&self) -> bool {
35        self.inner.done
36    }
37
38    /// Poll fetches the latest state of a long-running operation.
39    ///
40    /// If Poll fails, the error is returned and op is unmodified.
41    /// If Poll succeeds and the operation has completed with failure,
42    /// the error is returned and op.Done will return true.
43    /// If Poll succeeds and the operation has completed successfully,
44    /// op.Done will return true; if resp != nil, the response of the operation
45    /// is stored in resp.
46    pub async fn poll(&mut self) -> Result<Option<T>, Status> {
47        if !self.done() {
48            let req = GetOperationRequest {
49                name: self.name().to_string(),
50            };
51            let operation = self.client.get_operation(req, None).await?;
52            self.inner = operation.into_inner()
53        }
54        if !self.done() {
55            return Ok(None);
56        }
57        let operation_result = self.inner.result.clone().unwrap();
58        match operation_result {
59            operation::Result::Response(message) => {
60                //TODO avoid unwrap
61                let decoded = T::decode(message.value.as_slice()).unwrap();
62                Ok(Some(decoded))
63            }
64            operation::Result::Error(status) => {
65                let tonic_code = tonic::Code::from(status.code);
66                Err(tonic::Status::new(tonic_code, status.message))
67            }
68        }
69    }
70
71    /// wait implements Wait, taking exponentialBackoff and sleeper arguments for testing.
72    pub async fn wait(&mut self, option: Option<RetrySetting>) -> Result<Option<T>, Status> {
73        let settings = match option {
74            Some(s) => s,
75            None => {
76                let mut setting = default_retry_setting();
77                setting.codes = vec![Code::DeadlineExceeded];
78                setting
79            }
80        };
81        invoke_fn(
82            Some(settings),
83            |me| async {
84                let poll_result: Option<T> = match me.poll().await {
85                    Ok(s) => s,
86                    Err(e) => return Err((e, me)),
87                };
88                if me.done() {
89                    Ok(poll_result)
90                } else {
91                    Err((tonic::Status::new(tonic::Code::DeadlineExceeded, "wait timeout"), me))
92                }
93            },
94            self,
95        )
96        .await
97    }
98
99    /// Cancel starts asynchronous cancellation on a long-running operation. The server
100    /// makes a best effort to cancel the operation, but success is not
101    /// guaranteed. If the server doesn't support this method, it returns
102    /// status.Code(err) == codes.Unimplemented. Clients can use
103    /// Poll or other methods to check whether the cancellation succeeded or whether the
104    /// operation completed despite cancellation. On successful cancellation,
105    /// the operation is not deleted; instead, op.Poll returns an error
106    /// with code Canceled.
107    pub async fn cancel(&mut self) -> Result<(), Status> {
108        let req = CancelOperationRequest {
109            name: self.name().to_string(),
110        };
111        self.client.cancel_operation(req, None).await.map(|_x| ())
112    }
113
114    /// Delete deletes a long-running operation. This method indicates that the client is
115    /// no longer interested in the operation result. It does not cancel the
116    /// operation. If the server doesn't support this method, status.Code(err) == codes.Unimplemented.
117    pub async fn delete(&mut self) -> Result<(), Status> {
118        let req = DeleteOperationRequest {
119            name: self.name().to_string(),
120        };
121        self.client.delete_operation(req, None).await.map(|_x| ())
122    }
123}