Skip to main content

google_cloud_pubsub/subscriber/
client.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use crate::ClientBuilderResult as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut stream = client
31///     .subscribe("projects/my-project/subscriptions/my-subscription")
32///     .build();
33/// while let Some((m, h)) = stream.next().await.transpose()? {
34///     println!("Received message m={m:?}");
35///     h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Ordered Delivery
41///
42/// The subscriber returns messages in order if [ordered delivery] is enabled on
43/// the subscription. The client provides the same guarantees as the service.
44///
45/// For more details on how the service works, see:
46///
47/// - [Considerations when using ordered delivery][considerations]
48/// - [Google Cloud Pub/Sub Ordered Delivery][medium]
49///
50/// [considerations]: https://docs.cloud.google.com/pubsub/docs/ordering#considerations_when_using_ordered_messaging
51/// [medium]: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8
52/// [ordered delivery]: https://docs.cloud.google.com/pubsub/docs/ordering
53///
54/// # Exactly-once Delivery
55///
56/// The subscriber does **not** support [exactly-once] delivery yet.
57///
58/// If you subscribe to a subscription with exactly-once delivery enabled, the
59/// subscriber will deliver you messages with at-least-once semantics. There is
60/// no way for you to confirm the acknowledgements from the server. Messages may
61/// get redelivered.
62///
63/// Adding support for exactly-once delivery is planned. You can track the
64/// progress in [google-cloud-rust#3964].
65///
66/// [exactly-once]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
67/// [google-cloud-rust#3964]: https://github.com/googleapis/google-cloud-rust/issues/3964
68///
69/// # Configuration
70///
71/// To configure a `Subscriber` use the `with_*` methods in the type returned by
72/// [builder()][Subscriber::builder]. The default configuration should work for
73/// most applications. Common configuration changes include:
74///
75/// * [with_endpoint()]: by default this client uses the global default endpoint
76///   (`https://pubsub.googleapis.com`). Applications using regional endpoints
77///   or running in restricted networks (e.g. a network configured with
78///   [Private Google Access with VPC Service Controls]) may want to override
79///   this default.
80/// * [with_credentials()]: by default this client uses
81///   [Application Default Credentials]. Applications using custom
82///   authentication may need to override this default.
83///
84/// # Pooling and Cloning
85///
86/// `Subscriber` holds a connection pool internally, it is advised to
87/// create one and then reuse it.  You do not need to wrap `Subscriber` in
88/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
89/// internally.
90///
91/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
92/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
93/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
94/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
95/// [with_endpoint()]: ClientBuilder::with_endpoint
96/// [with_credentials()]: ClientBuilder::with_credentials
97#[derive(Clone, Debug)]
98pub struct Subscriber {
99    inner: Arc<Transport>,
100    client_id: String,
101    grpc_subchannel_count: usize,
102}
103
104impl Subscriber {
105    /// Returns a builder for [Subscriber].
106    ///
107    /// # Example
108    /// ```
109    /// # use google_cloud_pubsub::client::Subscriber;
110    /// # async fn sample() -> anyhow::Result<()> {
111    /// let client = Subscriber::builder().build().await?;
112    /// # Ok(()) }
113    /// ```
114    pub fn builder() -> ClientBuilder {
115        ClientBuilder::new()
116    }
117
118    /// Receive messages from a [subscription].
119    ///
120    /// The `subscription` is the full name, in the format of
121    /// `projects/*/subscriptions/*`.
122    ///
123    /// # Example
124    /// ```
125    /// # use google_cloud_pubsub::client::Subscriber;
126    /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
127    /// let mut stream = client
128    ///     .subscribe("projects/my-project/subscriptions/my-subscription")
129    ///     .build();
130    /// while let Some((m, h)) = stream.next().await.transpose()? {
131    ///     println!("Received message m={m:?}");
132    ///     h.ack();
133    /// }
134    /// # Ok(()) }
135    /// ```
136    ///
137    /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
138    pub fn subscribe<T>(&self, subscription: T) -> Subscribe
139    where
140        T: Into<String>,
141    {
142        Subscribe::new(
143            self.inner.clone(),
144            subscription.into(),
145            self.client_id.clone(),
146            self.grpc_subchannel_count,
147        )
148    }
149
150    pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
151        let grpc_subchannel_count =
152            std::cmp::max(1, builder.config.grpc_subchannel_count.unwrap_or(1));
153        let transport = Transport::new(builder.config).await?;
154        Ok(Self {
155            inner: Arc::new(transport),
156            client_id: uuid::Uuid::new_v4().to_string(),
157            grpc_subchannel_count,
158        })
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use gaxi::grpc::tonic::Status as TonicStatus;
166    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
167    use pubsub_grpc_mock::{MockSubscriber, start};
168
169    #[tokio::test]
170    async fn basic() -> anyhow::Result<()> {
171        let _ = Subscriber::builder().build().await?;
172        Ok(())
173    }
174
175    #[tokio::test]
176    async fn streaming_pull() -> anyhow::Result<()> {
177        let mut mock = MockSubscriber::new();
178        mock.expect_streaming_pull()
179            .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
180        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
181        let client = Subscriber::builder()
182            .with_endpoint(endpoint)
183            .with_credentials(Anonymous::new().build())
184            .build()
185            .await?;
186        let err = client
187            .subscribe("projects/p/subscriptions/s")
188            .build()
189            .next()
190            .await
191            .expect("stream should not be empty")
192            .expect_err("the first streamed item should be an error");
193        assert!(err.status().is_some(), "{err:?}");
194        let status = err.status().unwrap();
195        assert_eq!(
196            status.code,
197            google_cloud_gax::error::rpc::Code::FailedPrecondition
198        );
199        assert_eq!(status.message, "fail");
200
201        Ok(())
202    }
203
204    #[tokio::test]
205    async fn grpc_subchannel_count() -> anyhow::Result<()> {
206        let client = Subscriber::builder()
207            .with_credentials(Anonymous::new().build())
208            .build()
209            .await?;
210        assert_eq!(client.grpc_subchannel_count, 1);
211
212        let client = Subscriber::builder()
213            .with_credentials(Anonymous::new().build())
214            .with_grpc_subchannel_count(0)
215            .build()
216            .await?;
217        assert_eq!(client.grpc_subchannel_count, 1);
218
219        let client = Subscriber::builder()
220            .with_credentials(Anonymous::new().build())
221            .with_grpc_subchannel_count(8)
222            .build()
223            .await?;
224        assert_eq!(client.grpc_subchannel_count, 8);
225
226        let builder = client.subscribe("projects/p/subscriptions/s");
227        assert_eq!(builder.grpc_subchannel_count, 8);
228
229        Ok(())
230    }
231}