1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use crate::autogen::operations_client::OperationsClient;
use google_cloud_gax::call_option::{BackoffRetrySettings, BackoffRetryer};
use google_cloud_gax::invoke::invoke_reuse;
use google_cloud_googleapis::longrunning::{
    operation, CancelOperationRequest, DeleteOperationRequest, GetOperationRequest,
    Operation as InternalOperation,
};
use google_cloud_googleapis::{Code, Status};
use std::marker::PhantomData;

pub struct Operation<T: prost::Message + Default> {
    inner: InternalOperation,
    client: OperationsClient,
    _marker: PhantomData<T>,
}

impl<T: prost::Message + Default> Operation<T> {
    pub fn new(client: OperationsClient, inner: InternalOperation) -> Self {
        Self {
            client,
            inner,
            _marker: PhantomData::default(),
        }
    }

    /// Name returns the name of the long-running operation.
    /// The name is assigned by the server and is unique within the service
    /// from which the operation is created.
    pub fn name(&self) -> &str {
        &self.inner.name
    }

    /// Done reports whether the long-running operation has completed.
    pub fn done(&self) -> bool {
        self.inner.done
    }

    /// Poll fetches the latest state of a long-running operation.
    ///
    /// If Poll fails, the error is returned and op is unmodified.
    /// If Poll succeeds and the operation has completed with failure,
    /// the error is returned and op.Done will return true.
    /// If Poll succeeds and the operation has completed successfully,
    /// op.Done will return true; if resp != nil, the response of the operation
    /// is stored in resp.
    pub async fn poll(&mut self) -> Result<Option<T>, Status> {
        if !self.done() {
            let operation = self
                .client
                .get_operation(
                    GetOperationRequest {
                        name: self.name().to_string(),
                    },
                    None,
                )
                .await?;
            self.inner = operation.into_inner()
        }
        if !self.done() {
            return Ok(None);
        }
        let operation_result = self.inner.result.clone().unwrap();
        match operation_result {
            operation::Result::Response(message) => {
                //TODO avoid unwrap
                let decoded = T::decode(message.value.as_slice()).unwrap();
                Ok(Some(decoded))
            }
            operation::Result::Error(status) => {
                let tonic_code = tonic::Code::from(status.code);
                Err(tonic::Status::new(tonic_code, status.message.to_string()).into())
            }
        }
    }

    /// wait implements Wait, taking exponentialBackoff and sleeper arguments for testing.
    pub async fn wait(
        &mut self,
        option: Option<BackoffRetrySettings>,
    ) -> Result<Option<T>, Status> {
        let mut settings = match option {
            Some(s) => s,
            None => BackoffRetrySettings {
                retryer: BackoffRetryer {
                    backoff: Default::default(),
                    codes: vec![Code::DeadlineExceeded],
                },
            },
        };
        return invoke_reuse(
            |me| async {
                let poll_result: Option<T> = match me.poll().await {
                    Ok(s) => s,
                    Err(e) => return Err((e, me)),
                };
                if me.done() {
                    Ok(poll_result)
                } else {
                    Err((
                        tonic::Status::new(tonic::Code::DeadlineExceeded, "wait timeout").into(),
                        me,
                    ))
                }
            },
            self,
            &mut settings,
        )
        .await;
    }

    /// Cancel starts asynchronous cancellation on a long-running operation. The server
    /// makes a best effort to cancel the operation, but success is not
    /// guaranteed. If the server doesn't support this method, it returns
    /// status.Code(err) == codes.Unimplemented. Clients can use
    /// Poll or other methods to check whether the cancellation succeeded or whether the
    /// operation completed despite cancellation. On successful cancellation,
    /// the operation is not deleted; instead, op.Poll returns an error
    /// with code Canceled.
    pub async fn cancel(&mut self) -> Result<(), Status> {
        self.client
            .cancel_operation(
                CancelOperationRequest {
                    name: self.name().to_string(),
                },
                None,
            )
            .await
            .map(|_x| ())
    }

    /// Delete deletes a long-running operation. This method indicates that the client is
    /// no longer interested in the operation result. It does not cancel the
    /// operation. If the server doesn't support this method, status.Code(err) == codes.Unimplemented.
    pub async fn delete(&mut self) -> Result<(), Status> {
        self.client
            .delete_operation(
                DeleteOperationRequest {
                    name: self.name().to_string(),
                },
                None,
            )
            .await
            .map(|_x| ())
    }
}