1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::autogen::operations_client::{default_retry_setting, OperationsClient};
use google_cloud_gax::cancel::CancellationToken;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::{invoke_fn, RetrySetting};
use google_cloud_googleapis::longrunning::{
operation, CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, Operation as InternalOperation,
};
use std::marker::PhantomData;
pub struct Operation<T: prost::Message + Default> {
inner: InternalOperation,
client: OperationsClient,
_marker: PhantomData<T>,
}
impl<T: prost::Message + Default> Operation<T> {
pub fn new(client: OperationsClient, inner: InternalOperation) -> Self {
Self {
client,
inner,
_marker: PhantomData::default(),
}
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn done(&self) -> bool {
self.inner.done
}
pub async fn poll(&mut self, cancel: Option<CancellationToken>) -> Result<Option<T>, Status> {
if !self.done() {
let req = GetOperationRequest {
name: self.name().to_string(),
};
let operation = self.client.get_operation(req, cancel, None).await?;
self.inner = operation.into_inner()
}
if !self.done() {
return Ok(None);
}
let operation_result = self.inner.result.clone().unwrap();
match operation_result {
operation::Result::Response(message) => {
let decoded = T::decode(message.value.as_slice()).unwrap();
Ok(Some(decoded))
}
operation::Result::Error(status) => {
let tonic_code = tonic::Code::from(status.code);
Err(tonic::Status::new(tonic_code, status.message))
}
}
}
pub async fn wait(
&mut self,
cancel: Option<CancellationToken>,
option: Option<RetrySetting>,
) -> Result<Option<T>, Status> {
let settings = match option {
Some(s) => s,
None => {
let mut setting = default_retry_setting();
setting.codes = vec![Code::DeadlineExceeded];
setting
}
};
invoke_fn(
cancel,
Some(settings),
|me| async {
let poll_result: Option<T> = match me.poll(None).await {
Ok(s) => s,
Err(e) => return Err((e, me)),
};
if me.done() {
Ok(poll_result)
} else {
Err((tonic::Status::new(tonic::Code::DeadlineExceeded, "wait timeout"), me))
}
},
self,
)
.await
}
pub async fn cancel(&mut self, cancel: Option<CancellationToken>) -> Result<(), Status> {
let req = CancelOperationRequest {
name: self.name().to_string(),
};
self.client.cancel_operation(req, cancel, None).await.map(|_x| ())
}
pub async fn delete(&mut self, cancel: Option<CancellationToken>) -> Result<(), Status> {
let req = DeleteOperationRequest {
name: self.name().to_string(),
};
self.client.delete_operation(req, cancel, None).await.map(|_x| ())
}
}