google_cloud_pubsub/publisher/model_ext.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17use std::task::{Context, Poll, ready};
18use tokio::sync::oneshot;
19
20/// A handle that represents an in-flight publish operation.
21///
22/// This struct is a `Future`. You can `.await` it to get the final
23/// result of the publish call: either a server-assigned message ID `String`
24/// or an `Error` if the publish failed.
25///
26/// A `PublishHandle` is returned from every call to [`Publisher::publish`][crate::client::Publisher::publish]
27///
28/// # Example
29///
30/// ```
31/// # use google_cloud_pubsub::client::Publisher;
32/// # use google_cloud_pubsub::model::PubsubMessage;
33/// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
34/// // publish() returns a handle immediately.
35/// let handle = publisher.publish(PubsubMessage::new().set_data("hello world"));
36///
37/// // The handle can be awaited later to get the result.
38/// match handle.await {
39/// Ok(message_id) => println!("Message published with ID: {message_id}"),
40/// Err(e) => eprintln!("Failed to publish message: {e:?}"),
41/// }
42/// # Ok(())
43/// # }
44/// ```
45pub struct PublishHandle {
46 pub(crate) rx: oneshot::Receiver<std::result::Result<String, crate::error::PublishError>>,
47}
48
49impl Future for PublishHandle {
50 /// The result of the publish operation.
51 /// - `Ok(String)`: The server-assigned message ID.
52 /// - `Err(Error)`: An error indicating the publish failed.
53 type Output = crate::Result<String>;
54
55 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56 let result = ready!(Pin::new(&mut self.rx).poll(cx));
57 // An error will only occur if the sender of the self.rx was dropped,
58 // which would be a bug.
59 Poll::Ready(
60 result
61 .expect("the client library should not release the sender")
62 .map_err(convert_error),
63 )
64 }
65}
66
67fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
68 // TODO(#3689): The error type for these are not ideal, we will need will
69 // need to handle error propagation better.
70 match e {
71 crate::error::PublishError::SendError(s) => gax::error::Error::io(s.clone()),
72 crate::error::PublishError::OrderingKeyPaused(()) => gax::error::Error::io(e),
73 }
74}