Skip to main content

google_cloud_pubsub/subscriber/
client.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use crate::ClientBuilderResult as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut stream = client
31///     .subscribe("projects/my-project/subscriptions/my-subscription")
32///     .build();
33/// while let Some((m, h)) = stream.next().await.transpose()? {
34///     println!("Received message m={m:?}");
35///     h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Ordered Delivery
41///
42/// The subscriber returns messages in order if [ordered delivery] is enabled on
43/// the subscription. The client provides the same guarantees as the service.
44///
45/// For more details on how the service works, see:
46///
47/// - [Considerations when using ordered delivery][considerations]
48/// - [Google Cloud Pub/Sub Ordered Delivery][medium]
49///
50/// [considerations]: https://docs.cloud.google.com/pubsub/docs/ordering#considerations_when_using_ordered_messaging
51/// [medium]: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8
52/// [ordered delivery]: https://docs.cloud.google.com/pubsub/docs/ordering
53///
54/// # Exactly-once Delivery
55///
56/// The subscriber supports [exactly-once] delivery.
57///
58/// If you enable exactly-once delivery for a subscription, your application
59/// can be opinionated about the delivery type, by destructuring the handler
60/// into its [`Handler::ExactlyOnce`][eo-branch] branch.
61///
62/// ```
63/// use google_cloud_pubsub::subscriber::MessageStream;
64/// use google_cloud_pubsub::subscriber::handler::Handler;
65/// async fn exactly_once_stream(mut stream: MessageStream) -> anyhow::Result<()> {
66///   while let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? {
67///       println!("Received message m={m:?}");
68///
69///       // Await the result of the ack. Typically you would not block the loop
70///       // with an `await` point like this.
71///       h.confirmed_ack().await?;
72///   }
73///   unreachable!("Oops, my subscription must have at-least-once semantics")
74/// }
75/// ```
76///
77/// You should not change the delivery type of a subscription midstream. If you
78/// do, the subscriber will honor the delivery setting at the time each message
79/// was received.
80///
81/// [exactly-once]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
82///
83/// # Configuration
84///
85/// To configure a `Subscriber` use the `with_*` methods in the type returned by
86/// [builder()][Subscriber::builder]. The default configuration should work for
87/// most applications. Common configuration changes include:
88///
89/// * [with_endpoint()]: by default this client uses the global default endpoint
90///   (`https://pubsub.googleapis.com`). Applications using regional endpoints
91///   or running in restricted networks (e.g. a network configured with
92///   [Private Google Access with VPC Service Controls]) may want to override
93///   this default.
94/// * [with_credentials()]: by default this client uses
95///   [Application Default Credentials]. Applications using custom
96///   authentication may need to override this default.
97///
98/// # Pooling and Cloning
99///
100/// `Subscriber` holds a connection pool internally, it is advised to
101/// create one and then reuse it.  You do not need to wrap `Subscriber` in
102/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
103/// internally.
104///
105/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
106/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
107/// [eo-branch]: crate::subscriber::handler::Handler::ExactlyOnce
108/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
109/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
110/// [with_endpoint()]: ClientBuilder::with_endpoint
111/// [with_credentials()]: ClientBuilder::with_credentials
112#[derive(Clone, Debug)]
113pub struct Subscriber {
114    inner: Arc<Transport>,
115    client_id: String,
116    grpc_subchannel_count: usize,
117}
118
119impl Subscriber {
120    /// Returns a builder for [Subscriber].
121    ///
122    /// # Example
123    /// ```
124    /// # use google_cloud_pubsub::client::Subscriber;
125    /// # async fn sample() -> anyhow::Result<()> {
126    /// let client = Subscriber::builder().build().await?;
127    /// # Ok(()) }
128    /// ```
129    pub fn builder() -> ClientBuilder {
130        ClientBuilder::new()
131    }
132
133    /// Receive messages from a [subscription].
134    ///
135    /// The `subscription` is the full name, in the format of
136    /// `projects/*/subscriptions/*`.
137    ///
138    /// # Example
139    /// ```
140    /// # use google_cloud_pubsub::client::Subscriber;
141    /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
142    /// let mut stream = client
143    ///     .subscribe("projects/my-project/subscriptions/my-subscription")
144    ///     .build();
145    /// while let Some((m, h)) = stream.next().await.transpose()? {
146    ///     println!("Received message m={m:?}");
147    ///     h.ack();
148    /// }
149    /// # Ok(()) }
150    /// ```
151    ///
152    /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
153    pub fn subscribe<T>(&self, subscription: T) -> Subscribe
154    where
155        T: Into<String>,
156    {
157        Subscribe::new(
158            self.inner.clone(),
159            subscription.into(),
160            self.client_id.clone(),
161            self.grpc_subchannel_count,
162        )
163    }
164
165    pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
166        let grpc_subchannel_count =
167            std::cmp::max(1, builder.config.grpc_subchannel_count.unwrap_or(1));
168        let transport = Transport::new(builder.config).await?;
169        Ok(Self {
170            inner: Arc::new(transport),
171            client_id: uuid::Uuid::new_v4().to_string(),
172            grpc_subchannel_count,
173        })
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use gaxi::grpc::tonic::Status as TonicStatus;
181    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
182    use pubsub_grpc_mock::{MockSubscriber, start};
183
184    #[tokio::test]
185    async fn basic() -> anyhow::Result<()> {
186        let _ = Subscriber::builder().build().await?;
187        Ok(())
188    }
189
190    #[tokio::test]
191    async fn streaming_pull() -> anyhow::Result<()> {
192        let mut mock = MockSubscriber::new();
193        mock.expect_streaming_pull()
194            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
195        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
196        let client = Subscriber::builder()
197            .with_endpoint(endpoint)
198            .with_credentials(Anonymous::new().build())
199            .build()
200            .await?;
201        let err = client
202            .subscribe("projects/p/subscriptions/s")
203            .build()
204            .next()
205            .await
206            .expect("stream should not be empty")
207            .expect_err("the first streamed item should be an error");
208        assert!(err.status().is_some(), "{err:?}");
209        let status = err.status().unwrap();
210        assert_eq!(
211            status.code,
212            google_cloud_gax::error::rpc::Code::FailedPrecondition
213        );
214        assert_eq!(status.message, "fail");
215
216        Ok(())
217    }
218
219    #[tokio::test]
220    async fn grpc_subchannel_count() -> anyhow::Result<()> {
221        let client = Subscriber::builder()
222            .with_credentials(Anonymous::new().build())
223            .build()
224            .await?;
225        assert_eq!(client.grpc_subchannel_count, 1);
226
227        let client = Subscriber::builder()
228            .with_credentials(Anonymous::new().build())
229            .with_grpc_subchannel_count(0)
230            .build()
231            .await?;
232        assert_eq!(client.grpc_subchannel_count, 1);
233
234        let client = Subscriber::builder()
235            .with_credentials(Anonymous::new().build())
236            .with_grpc_subchannel_count(8)
237            .build()
238            .await?;
239        assert_eq!(client.grpc_subchannel_count, 8);
240
241        let builder = client.subscribe("projects/p/subscriptions/s");
242        assert_eq!(builder.grpc_subchannel_count, 8);
243
244        Ok(())
245    }
246}