Skip to main content

google_cloud_lro/
lib.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and functions to make LROs easier to use and to require less boilerplate.
16//!
17//! Occasionally, a Google Cloud service may need to expose a method that takes
18//! a significant amount of time to complete. In these situations, it is often
19//! a poor user experience to simply block while the task runs. Such services
20//! return a long-running operation, a type of promise that can be polled until
21//! it completes successfully.
22//!
23//! Polling these operations can be tedious. The application needs to
24//! periodically make RPCs, extract the result from the response, and may need
25//! to implement a stream to return metadata representing any progress in the
26//! RPC.
27//!
28//! The Google Cloud client libraries for Rust return implementations of this
29//! trait to simplify working with these long-running operations.
30//!
31//! # Example: automatically poll until completion
32//! ```no_run
33//! # use google_cloud_lro::{internal::Operation, Poller};
34//! # use serde::{Deserialize, Serialize};
35//! # use google_cloud_gax::Result;
36//! # use google_cloud_wkt::Timestamp as Response;
37//! # use google_cloud_wkt::Duration as Metadata;
38//! async fn start_lro() -> impl Poller<Response, Metadata> {
39//!     // ... details omitted ...
40//!     # async fn start() -> Result<Operation<Response, Metadata>> { panic!(); }
41//!     # async fn query(_: String) -> Result<Operation<Response, Metadata>> { panic!(); }
42//!     # google_cloud_lro::internal::new_poller(
43//!     #    std::sync::Arc::new(google_cloud_gax::polling_error_policy::AlwaysContinue),
44//!     #    std::sync::Arc::new(google_cloud_gax::exponential_backoff::ExponentialBackoff::default()),
45//!     #    start, query
46//!     # )
47//! }
48//! # async fn sample() -> anyhow::Result<()> {
49//! let response = start_lro()
50//!     .await
51//!     .until_done()
52//!     .await?;
53//! println!("response = {response:?}");
54//! # Ok(()) }
55//! ```
56//!
57//! # Example: poll with metadata
58//! ```
59//! # use google_cloud_lro::{internal::Operation, Poller, PollingResult};
60//! # use serde::{Deserialize, Serialize};
61//! # use google_cloud_gax::Result;
62//! # use google_cloud_wkt::Timestamp as Response;
63//! # use google_cloud_wkt::Duration as Metadata;
64//!
65//! async fn start_lro() -> impl Poller<Response, Metadata> {
66//!     // ... details omitted ...
67//!     # async fn start() -> Result<Operation<Response, Metadata>> { panic!(); }
68//!     # async fn query(_: String) -> Result<Operation<Response, Metadata>> { panic!(); }
69//!     # google_cloud_lro::internal::new_poller(
70//!     #    std::sync::Arc::new(google_cloud_gax::polling_error_policy::AlwaysContinue),
71//!     #    std::sync::Arc::new(google_cloud_gax::exponential_backoff::ExponentialBackoff::default()),
72//!     #    start, query
73//!     # )
74//! }
75//! # async fn sample() -> anyhow::Result<()> {
76//! let mut poller = start_lro().await;
77//! while let Some(p) = poller.poll().await {
78//!     match p {
79//!         PollingResult::Completed(r) => { println!("LRO completed, response={r:?}"); }
80//!         PollingResult::InProgress(m) => { println!("LRO in progress, metadata={m:?}"); }
81//!         PollingResult::PollingError(e) => { println!("Transient error polling the LRO: {e}"); }
82//!     }
83//!     tokio::time::sleep(std::time::Duration::from_millis(100)).await;
84//! }
85//! # Ok(()) }
86//! ```
87
88#![cfg_attr(docsrs, feature(doc_cfg))]
89
90use google_cloud_gax::Result;
91use google_cloud_gax::error::Error;
92use google_cloud_gax::polling_backoff_policy::PollingBackoffPolicy;
93use google_cloud_gax::polling_error_policy::PollingErrorPolicy;
94use google_cloud_gax::polling_state::PollingState;
95use std::future::Future;
96
97/// The result of polling a Long-Running Operation (LRO).
98///
99/// # Parameters
100/// * `ResponseType` - This is the type returned when the LRO completes
101///   successfully.
102/// * `MetadataType` - The LRO may return values of this type while the
103///   operation is in progress. This may include some measure of "progress".
104#[derive(Debug)]
105#[allow(clippy::exhaustive_enums)]
106pub enum PollingResult<ResponseType, MetadataType> {
107    /// The operation is still in progress.
108    InProgress(Option<MetadataType>),
109    /// The operation completed. This includes the result.
110    Completed(Result<ResponseType>),
111    /// An error trying to poll the LRO.
112    ///
113    /// Not all errors indicate that the operation failed. For example, this
114    /// may fail because it was not possible to connect to Google Cloud. Such
115    /// transient errors may disappear in the next polling attempt.
116    ///
117    /// Other errors will never recover. For example, a [Error] with
118    /// a [NOT_FOUND], [ABORTED], or [PERMISSION_DENIED] status code will never
119    /// recover.
120    ///
121    /// [Error]: google_cloud_gax::error::Error
122    /// [NOT_FOUND]: google_cloud_rpc::model::Code::NotFound
123    /// [ABORTED]: google_cloud_rpc::model::Code::Aborted
124    /// [PERMISSION_DENIED]: google_cloud_rpc::model::Code::PermissionDenied
125    PollingError(Error),
126}
127
128#[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
129#[allow(missing_docs)]
130pub mod internal;
131
132#[cfg(google_cloud_unstable_tracing)]
133pub use internal::{PollerOptions, TracingDetails};
134
135#[cfg(google_cloud_unstable_tracing)]
136#[doc(hidden)]
137tokio::task_local! {
138    pub static POLL_ATTEMPT_COUNT: u32;
139}
140
141pub(crate) mod sealed {
142    use google_cloud_gax::polling_state::PollingState;
143    use std::future::Future;
144
145    pub trait Poller {
146        /// Sleep until the backoff time has elapsed.
147        fn backoff(&mut self, state: &PollingState) -> impl Future<Output = ()> + Send;
148    }
149}
150
151/// Automatically polls long-running operations.
152///
153/// # Parameters
154/// * `ResponseType` - This is the type returned when the LRO completes
155///   successfully.
156/// * `MetadataType` - The LRO may return values of this type while the
157///   operation is in progress. This may include some measure of "progress".
158pub trait Poller<ResponseType, MetadataType>: Send + sealed::Poller {
159    /// Query the current status of the long-running operation.
160    fn poll(
161        &mut self,
162    ) -> impl Future<Output = Option<PollingResult<ResponseType, MetadataType>>> + Send;
163
164    /// Poll the long-running operation until it completes.
165    fn until_done(self) -> impl Future<Output = Result<ResponseType>> + Send;
166
167    /// Convert a poller to a [Stream][futures::Stream].
168    #[cfg(feature = "unstable-stream")]
169    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
170    fn into_stream(
171        self,
172    ) -> impl futures::Stream<Item = PollingResult<ResponseType, MetadataType>> + Unpin;
173}
174
175/// The default implementation of the until_done loop used by most Poller implementations.
176///
177/// This encapsulates the common loop of repeatedly calling `poll()`, querying backoff/error policies,
178/// updating the `PollingState`, sleeping between attempts, and handling final completion.
179async fn until_done<P, R, M>(mut poller: P) -> Result<R>
180where
181    P: Poller<R, M> + Send,
182{
183    let mut state = PollingState::default();
184    while let Some(p) = poller.poll().await {
185        match p {
186            // Return, the operation completed or the polling policy is
187            // exhausted.
188            PollingResult::Completed(r) => return r,
189            // Continue, the operation was successfully polled and the
190            // polling policy was queried.
191            PollingResult::InProgress(_) => (),
192            // Continue, the polling policy was queried and decided the
193            // error is recoverable.
194            PollingResult::PollingError(_) => (),
195        }
196        state.attempt_count += 1;
197        poller.backoff(&state).await
198    }
199    // We can only get here if `poll()` returns `None`, but it only returns
200    // `None` after it returned `Polling::Completed` and therefore this is
201    // never reached.
202    unreachable!("loop should exit via the `Completed` branch vs. this line");
203}
204
205/// The default implementation of into_stream used by most Poller implementations.
206///
207/// This converts the Poller into a Stream by unfolding the state using poll().
208#[cfg(feature = "unstable-stream")]
209#[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
210fn into_stream<P, R, M>(poller: P) -> impl futures::Stream<Item = PollingResult<R, M>> + Unpin
211where
212    P: Poller<R, M> + Send,
213{
214    use futures::stream::unfold;
215    Box::pin(unfold(poller, |mut poller| async move {
216        poller.poll().await.map(|item| (item, poller))
217    }))
218}
219
220mod details;