1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use std::marker::PhantomData;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke_fn, RetrySetting};
use google_cloud_googleapis::longrunning::{
operation, CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation as InternalOperation,
};
use crate::autogen::operations_client::{default_retry_setting, OperationsClient};
pub struct Operation<T: prost::Message + Default> {
inner: InternalOperation,
client: OperationsClient,
_marker: PhantomData<T>,
}
impl<T: prost::Message + Default> Operation<T> {
pub fn new(client: OperationsClient, inner: InternalOperation) -> Self {
Self {
client,
inner,
_marker: PhantomData,
}
}
/// Name returns the name of the long-running operation.
/// The name is assigned by the server and is unique within the service
/// from which the operation is created.
pub fn name(&self) -> &str {
&self.inner.name
}
/// Done reports whether the long-running operation has completed.
pub fn done(&self) -> bool {
self.inner.done
}
/// Poll fetches the latest state of a long-running operation.
///
/// If Poll fails, the error is returned and op is unmodified.
/// If Poll succeeds and the operation has completed with failure,
/// the error is returned and op.Done will return true.
/// If Poll succeeds and the operation has completed successfully,
/// op.Done will return true; if resp != nil, the response of the operation
/// is stored in resp.
pub async fn poll(&mut self) -> Result<Option<T>, Status> {
if !self.done() {
let req = GetOperationRequest {
name: self.name().to_string(),
};
let operation = self.client.get_operation(req, None).await?;
self.inner = operation.into_inner()
}
if !self.done() {
return Ok(None);
}
let operation_result = self.inner.result.clone().unwrap();
match operation_result {
operation::Result::Response(message) => {
//TODO avoid unwrap
let decoded = T::decode(message.value.as_slice()).unwrap();
Ok(Some(decoded))
}
operation::Result::Error(status) => {
let tonic_code = tonic::Code::from(status.code);
Err(tonic::Status::new(tonic_code, status.message))
}
}
}
/// wait implements Wait, taking exponentialBackoff and sleeper arguments for testing.
pub async fn wait(&mut self, option: Option<RetrySetting>) -> Result<Option<T>, Status> {
let settings = match option {
Some(s) => s,
None => {
let mut setting = default_retry_setting();
setting.codes = vec![Code::DeadlineExceeded];
setting
}
};
invoke_fn(
Some(settings),
|me| async {
let poll_result: Option<T> = match me.poll().await {
Ok(s) => s,
Err(e) => return Err((e, me)),
};
if me.done() {
Ok(poll_result)
} else {
Err((tonic::Status::new(tonic::Code::DeadlineExceeded, "wait timeout"), me))
}
},
self,
)
.await
}
/// Cancel starts asynchronous cancellation on a long-running operation. The server
/// makes a best effort to cancel the operation, but success is not
/// guaranteed. If the server doesn't support this method, it returns
/// status.Code(err) == codes.Unimplemented. Clients can use
/// Poll or other methods to check whether the cancellation succeeded or whether the
/// operation completed despite cancellation. On successful cancellation,
/// the operation is not deleted; instead, op.Poll returns an error
/// with code Canceled.
pub async fn cancel(&mut self) -> Result<(), Status> {
let req = CancelOperationRequest {
name: self.name().to_string(),
};
self.client.cancel_operation(req, None).await.map(|_x| ())
}
/// Delete deletes a long-running operation. This method indicates that the client is
/// no longer interested in the operation result. It does not cancel the
/// operation. If the server doesn't support this method, status.Code(err) == codes.Unimplemented.
pub async fn delete(&mut self) -> Result<(), Status> {
let req = DeleteOperationRequest {
name: self.name().to_string(),
};
self.client.delete_operation(req, None).await.map(|_x| ())
}
}