google_cloud_lro/lib.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and functions to make LROs easier to use and to require less boilerplate.
16//!
17//! Occasionally, a Google Cloud service may need to expose a method that takes
18//! a significant amount of time to complete. In these situations, it is often
19//! a poor user experience to simply block while the task runs. Such services
20//! return a long-running operation, a type of promise that can be polled until
21//! it completes successfully.
22//!
23//! Polling these operations can be tedious. The application needs to
24//! periodically make RPCs, extract the result from the response, and may need
25//! to implement a stream to return metadata representing any progress in the
26//! RPC.
27//!
28//! The Google Cloud client libraries for Rust return implementations of this
29//! trait to simplify working with these long-running operations.
30//!
31//! # Example: automatically poll until completion
32//! ```no_run
33//! # use google_cloud_lro::{internal::Operation, Poller};
34//! # use serde::{Deserialize, Serialize};
35//! # use google_cloud_gax::Result;
36//! # use google_cloud_wkt::Timestamp as Response;
37//! # use google_cloud_wkt::Duration as Metadata;
38//! async fn start_lro() -> impl Poller<Response, Metadata> {
39//! // ... details omitted ...
40//! # async fn start() -> Result<Operation<Response, Metadata>> { panic!(); }
41//! # async fn query(_: String) -> Result<Operation<Response, Metadata>> { panic!(); }
42//! # google_cloud_lro::internal::new_poller(
43//! # std::sync::Arc::new(google_cloud_gax::polling_error_policy::AlwaysContinue),
44//! # std::sync::Arc::new(google_cloud_gax::exponential_backoff::ExponentialBackoff::default()),
45//! # start, query
46//! # )
47//! }
48//! # async fn sample() -> anyhow::Result<()> {
49//! let response = start_lro()
50//! .await
51//! .until_done()
52//! .await?;
53//! println!("response = {response:?}");
54//! # Ok(()) }
55//! ```
56//!
57//! # Example: poll with metadata
58//! ```
59//! # use google_cloud_lro::{internal::Operation, Poller, PollingResult};
60//! # use serde::{Deserialize, Serialize};
61//! # use google_cloud_gax::Result;
62//! # use google_cloud_wkt::Timestamp as Response;
63//! # use google_cloud_wkt::Duration as Metadata;
64//!
65//! async fn start_lro() -> impl Poller<Response, Metadata> {
66//! // ... details omitted ...
67//! # async fn start() -> Result<Operation<Response, Metadata>> { panic!(); }
68//! # async fn query(_: String) -> Result<Operation<Response, Metadata>> { panic!(); }
69//! # google_cloud_lro::internal::new_poller(
70//! # std::sync::Arc::new(google_cloud_gax::polling_error_policy::AlwaysContinue),
71//! # std::sync::Arc::new(google_cloud_gax::exponential_backoff::ExponentialBackoff::default()),
72//! # start, query
73//! # )
74//! }
75//! # async fn sample() -> anyhow::Result<()> {
76//! let mut poller = start_lro().await;
77//! while let Some(p) = poller.poll().await {
78//! match p {
79//! PollingResult::Completed(r) => { println!("LRO completed, response={r:?}"); }
80//! PollingResult::InProgress(m) => { println!("LRO in progress, metadata={m:?}"); }
81//! PollingResult::PollingError(e) => { println!("Transient error polling the LRO: {e}"); }
82//! }
83//! tokio::time::sleep(std::time::Duration::from_millis(100)).await;
84//! }
85//! # Ok(()) }
86//! ```
87
88#![cfg_attr(docsrs, feature(doc_cfg))]
89
90use google_cloud_gax::Result;
91use google_cloud_gax::error::Error;
92use google_cloud_gax::polling_backoff_policy::PollingBackoffPolicy;
93use google_cloud_gax::polling_error_policy::PollingErrorPolicy;
94use google_cloud_gax::polling_state::PollingState;
95use std::future::Future;
96
97/// The result of polling a Long-Running Operation (LRO).
98///
99/// # Parameters
100/// * `ResponseType` - This is the type returned when the LRO completes
101/// successfully.
102/// * `MetadataType` - The LRO may return values of this type while the
103/// operation is in progress. This may include some measure of "progress".
104#[derive(Debug)]
105#[allow(clippy::exhaustive_enums)]
106pub enum PollingResult<ResponseType, MetadataType> {
107 /// The operation is still in progress.
108 InProgress(Option<MetadataType>),
109 /// The operation completed. This includes the result.
110 Completed(Result<ResponseType>),
111 /// An error trying to poll the LRO.
112 ///
113 /// Not all errors indicate that the operation failed. For example, this
114 /// may fail because it was not possible to connect to Google Cloud. Such
115 /// transient errors may disappear in the next polling attempt.
116 ///
117 /// Other errors will never recover. For example, a [Error] with
118 /// a [NOT_FOUND], [ABORTED], or [PERMISSION_DENIED] status code will never
119 /// recover.
120 ///
121 /// [Error]: google_cloud_gax::error::Error
122 /// [NOT_FOUND]: google_cloud_rpc::model::Code::NotFound
123 /// [ABORTED]: google_cloud_rpc::model::Code::Aborted
124 /// [PERMISSION_DENIED]: google_cloud_rpc::model::Code::PermissionDenied
125 PollingError(Error),
126}
127
128#[cfg_attr(not(feature = "_internal-semver"), doc(hidden))]
129#[allow(missing_docs)]
130pub mod internal;
131
132#[cfg(google_cloud_unstable_tracing)]
133pub use internal::{PollerOptions, TracingDetails};
134
135#[cfg(google_cloud_unstable_tracing)]
136#[doc(hidden)]
137tokio::task_local! {
138 pub static POLL_ATTEMPT_COUNT: u32;
139}
140
141pub(crate) mod sealed {
142 use google_cloud_gax::polling_state::PollingState;
143 use std::future::Future;
144
145 pub trait Poller {
146 /// Sleep until the backoff time has elapsed.
147 fn backoff(&mut self, state: &PollingState) -> impl Future<Output = ()> + Send;
148 }
149}
150
151/// Automatically polls long-running operations.
152///
153/// # Parameters
154/// * `ResponseType` - This is the type returned when the LRO completes
155/// successfully.
156/// * `MetadataType` - The LRO may return values of this type while the
157/// operation is in progress. This may include some measure of "progress".
158pub trait Poller<ResponseType, MetadataType>: Send + sealed::Poller {
159 /// Query the current status of the long-running operation.
160 fn poll(
161 &mut self,
162 ) -> impl Future<Output = Option<PollingResult<ResponseType, MetadataType>>> + Send;
163
164 /// Poll the long-running operation until it completes.
165 fn until_done(self) -> impl Future<Output = Result<ResponseType>> + Send;
166
167 /// Convert a poller to a [Stream][futures::Stream].
168 #[cfg(feature = "unstable-stream")]
169 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
170 fn into_stream(
171 self,
172 ) -> impl futures::Stream<Item = PollingResult<ResponseType, MetadataType>> + Unpin;
173}
174
175/// The default implementation of the until_done loop used by most Poller implementations.
176///
177/// This encapsulates the common loop of repeatedly calling `poll()`, querying backoff/error policies,
178/// updating the `PollingState`, sleeping between attempts, and handling final completion.
179async fn until_done<P, R, M>(mut poller: P) -> Result<R>
180where
181 P: Poller<R, M> + Send,
182{
183 let mut state = PollingState::default();
184 while let Some(p) = poller.poll().await {
185 match p {
186 // Return, the operation completed or the polling policy is
187 // exhausted.
188 PollingResult::Completed(r) => return r,
189 // Continue, the operation was successfully polled and the
190 // polling policy was queried.
191 PollingResult::InProgress(_) => (),
192 // Continue, the polling policy was queried and decided the
193 // error is recoverable.
194 PollingResult::PollingError(_) => (),
195 }
196 state.attempt_count += 1;
197 poller.backoff(&state).await
198 }
199 // We can only get here if `poll()` returns `None`, but it only returns
200 // `None` after it returned `Polling::Completed` and therefore this is
201 // never reached.
202 unreachable!("loop should exit via the `Completed` branch vs. this line");
203}
204
205/// The default implementation of into_stream used by most Poller implementations.
206///
207/// This converts the Poller into a Stream by unfolding the state using poll().
208#[cfg(feature = "unstable-stream")]
209#[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
210fn into_stream<P, R, M>(poller: P) -> impl futures::Stream<Item = PollingResult<R, M>> + Unpin
211where
212 P: Poller<R, M> + Send,
213{
214 use futures::stream::unfold;
215 Box::pin(unfold(poller, |mut poller| async move {
216 poller.poll().await.map(|item| (item, poller))
217 }))
218}
219
220mod details;