Skip to main content

google_cloud_pubsub/publisher/
model_ext.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17use std::task::{Context, Poll, ready};
18use tokio::sync::oneshot;
19
20/// A handle that represents an in-flight publish operation.
21///
22/// This struct is a `Future`. You can `.await` it to get the final
23/// result of the publish call: either a server-assigned message ID `String`
24/// or an `Error` if the publish failed.
25///
26/// A `PublishHandle` is returned from every call to [`Publisher::publish`][crate::client::Publisher::publish]
27///
28/// # Example
29///
30/// ```
31/// # use google_cloud_pubsub::client::Publisher;
32/// # use google_cloud_pubsub::model::PubsubMessage;
33/// # async fn sample(publisher: Publisher) -> anyhow::Result<()> {
34/// // publish() returns a handle immediately.
35/// let handle = publisher.publish(PubsubMessage::new().set_data("hello world"));
36///
37/// // The handle can be awaited later to get the result.
38/// match handle.await {
39///     Ok(message_id) => println!("Message published with ID: {message_id}"),
40///     Err(e) => eprintln!("Failed to publish message: {e:?}"),
41/// }
42/// # Ok(())
43/// # }
44/// ```
45pub struct PublishHandle {
46    pub(crate) rx: oneshot::Receiver<std::result::Result<String, crate::error::PublishError>>,
47}
48
49impl Future for PublishHandle {
50    /// The result of the publish operation.
51    /// - `Ok(String)`: The server-assigned message ID.
52    /// - `Err(Error)`: An error indicating the publish failed.
53    type Output = crate::Result<String>;
54
55    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56        let result = ready!(Pin::new(&mut self.rx).poll(cx));
57        // An error will only occur if the sender of the self.rx was dropped,
58        // which would be a bug.
59        Poll::Ready(
60            result
61                .expect("the client library should not release the sender")
62                .map_err(convert_error),
63        )
64    }
65}
66
67fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
68    // TODO(#3689): The error type for these are not ideal, we will need will
69    // need to handle error propagation better.
70    match e {
71        crate::error::PublishError::SendError(s) => gax::error::Error::io(s.clone()),
72        crate::error::PublishError::OrderingKeyPaused(()) => gax::error::Error::io(e),
73    }
74}