google_cloud_pubsub/subscriber/client.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use crate::ClientBuilderResult as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut stream = client
31/// .subscribe("projects/my-project/subscriptions/my-subscription")
32/// .build();
33/// while let Some((m, h)) = stream.next().await.transpose()? {
34/// println!("Received message m={m:?}");
35/// h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Ordered Delivery
41///
42/// The subscriber returns messages in order if [ordered delivery] is enabled on
43/// the subscription. The client provides the same guarantees as the service.
44///
45/// For more details on how the service works, see:
46///
47/// - [Considerations when using ordered delivery][considerations]
48/// - [Google Cloud Pub/Sub Ordered Delivery][medium]
49///
50/// [considerations]: https://docs.cloud.google.com/pubsub/docs/ordering#considerations_when_using_ordered_messaging
51/// [medium]: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8
52/// [ordered delivery]: https://docs.cloud.google.com/pubsub/docs/ordering
53///
54/// # Exactly-once Delivery
55///
56/// The subscriber does **not** support [exactly-once] delivery yet.
57///
58/// If you subscribe to a subscription with exactly-once delivery enabled, the
59/// subscriber will deliver you messages with at-least-once semantics. There is
60/// no way for you to confirm the acknowledgements from the server. Messages may
61/// get redelivered.
62///
63/// Adding support for exactly-once delivery is planned. You can track the
64/// progress in [google-cloud-rust#3964].
65///
66/// [exactly-once]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
67/// [google-cloud-rust#3964]: https://github.com/googleapis/google-cloud-rust/issues/3964
68///
69/// # Configuration
70///
71/// To configure a `Subscriber` use the `with_*` methods in the type returned by
72/// [builder()][Subscriber::builder]. The default configuration should work for
73/// most applications. Common configuration changes include:
74///
75/// * [with_endpoint()]: by default this client uses the global default endpoint
76/// (`https://pubsub.googleapis.com`). Applications using regional endpoints
77/// or running in restricted networks (e.g. a network configured with
78/// [Private Google Access with VPC Service Controls]) may want to override
79/// this default.
80/// * [with_credentials()]: by default this client uses
81/// [Application Default Credentials]. Applications using custom
82/// authentication may need to override this default.
83///
84/// # Pooling and Cloning
85///
86/// `Subscriber` holds a connection pool internally, it is advised to
87/// create one and then reuse it. You do not need to wrap `Subscriber` in
88/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
89/// internally.
90///
91/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
92/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
93/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
94/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
95/// [with_endpoint()]: ClientBuilder::with_endpoint
96/// [with_credentials()]: ClientBuilder::with_credentials
97#[derive(Clone, Debug)]
98pub struct Subscriber {
99 inner: Arc<Transport>,
100 client_id: String,
101 grpc_subchannel_count: usize,
102}
103
104impl Subscriber {
105 /// Returns a builder for [Subscriber].
106 ///
107 /// # Example
108 /// ```
109 /// # use google_cloud_pubsub::client::Subscriber;
110 /// # async fn sample() -> anyhow::Result<()> {
111 /// let client = Subscriber::builder().build().await?;
112 /// # Ok(()) }
113 /// ```
114 pub fn builder() -> ClientBuilder {
115 ClientBuilder::new()
116 }
117
118 /// Receive messages from a [subscription].
119 ///
120 /// The `subscription` is the full name, in the format of
121 /// `projects/*/subscriptions/*`.
122 ///
123 /// # Example
124 /// ```
125 /// # use google_cloud_pubsub::client::Subscriber;
126 /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
127 /// let mut stream = client
128 /// .subscribe("projects/my-project/subscriptions/my-subscription")
129 /// .build();
130 /// while let Some((m, h)) = stream.next().await.transpose()? {
131 /// println!("Received message m={m:?}");
132 /// h.ack();
133 /// }
134 /// # Ok(()) }
135 /// ```
136 ///
137 /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
138 pub fn subscribe<T>(&self, subscription: T) -> Subscribe
139 where
140 T: Into<String>,
141 {
142 Subscribe::new(
143 self.inner.clone(),
144 subscription.into(),
145 self.client_id.clone(),
146 self.grpc_subchannel_count,
147 )
148 }
149
150 pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
151 let grpc_subchannel_count =
152 std::cmp::max(1, builder.config.grpc_subchannel_count.unwrap_or(1));
153 let transport = Transport::new(builder.config).await?;
154 Ok(Self {
155 inner: Arc::new(transport),
156 client_id: uuid::Uuid::new_v4().to_string(),
157 grpc_subchannel_count,
158 })
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use gaxi::grpc::tonic::Status as TonicStatus;
166 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
167 use pubsub_grpc_mock::{MockSubscriber, start};
168
169 #[tokio::test]
170 async fn basic() -> anyhow::Result<()> {
171 let _ = Subscriber::builder().build().await?;
172 Ok(())
173 }
174
175 #[tokio::test]
176 async fn streaming_pull() -> anyhow::Result<()> {
177 let mut mock = MockSubscriber::new();
178 mock.expect_streaming_pull()
179 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
180 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
181 let client = Subscriber::builder()
182 .with_endpoint(endpoint)
183 .with_credentials(Anonymous::new().build())
184 .build()
185 .await?;
186 let err = client
187 .subscribe("projects/p/subscriptions/s")
188 .build()
189 .next()
190 .await
191 .expect("stream should not be empty")
192 .expect_err("the first streamed item should be an error");
193 assert!(err.status().is_some(), "{err:?}");
194 let status = err.status().unwrap();
195 assert_eq!(
196 status.code,
197 google_cloud_gax::error::rpc::Code::FailedPrecondition
198 );
199 assert_eq!(status.message, "fail");
200
201 Ok(())
202 }
203
204 #[tokio::test]
205 async fn grpc_subchannel_count() -> anyhow::Result<()> {
206 let client = Subscriber::builder()
207 .with_credentials(Anonymous::new().build())
208 .build()
209 .await?;
210 assert_eq!(client.grpc_subchannel_count, 1);
211
212 let client = Subscriber::builder()
213 .with_credentials(Anonymous::new().build())
214 .with_grpc_subchannel_count(0)
215 .build()
216 .await?;
217 assert_eq!(client.grpc_subchannel_count, 1);
218
219 let client = Subscriber::builder()
220 .with_credentials(Anonymous::new().build())
221 .with_grpc_subchannel_count(8)
222 .build()
223 .await?;
224 assert_eq!(client.grpc_subchannel_count, 8);
225
226 let builder = client.subscribe("projects/p/subscriptions/s");
227 assert_eq!(builder.grpc_subchannel_count, 8);
228
229 Ok(())
230 }
231}