google_cloud_pubsub/subscriber/client.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::Subscribe;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use crate::ClientBuilderResult as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut stream = client
31/// .subscribe("projects/my-project/subscriptions/my-subscription")
32/// .build();
33/// while let Some((m, h)) = stream.next().await.transpose()? {
34/// println!("Received message m={m:?}");
35/// h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Ordered Delivery
41///
42/// The subscriber returns messages in order if [ordered delivery] is enabled on
43/// the subscription. The client provides the same guarantees as the service.
44///
45/// For more details on how the service works, see:
46///
47/// - [Considerations when using ordered delivery][considerations]
48/// - [Google Cloud Pub/Sub Ordered Delivery][medium]
49///
50/// [considerations]: https://docs.cloud.google.com/pubsub/docs/ordering#considerations_when_using_ordered_messaging
51/// [medium]: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8
52/// [ordered delivery]: https://docs.cloud.google.com/pubsub/docs/ordering
53///
54/// # Exactly-once Delivery
55///
56/// The subscriber supports [exactly-once] delivery.
57///
58/// If you enable exactly-once delivery for a subscription, your application
59/// can be opinionated about the delivery type, by destructuring the handler
60/// into its [`Handler::ExactlyOnce`][eo-branch] branch.
61///
62/// ```
63/// use google_cloud_pubsub::subscriber::MessageStream;
64/// use google_cloud_pubsub::subscriber::handler::Handler;
65/// async fn exactly_once_stream(mut stream: MessageStream) -> anyhow::Result<()> {
66/// while let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? {
67/// println!("Received message m={m:?}");
68///
69/// // Await the result of the ack. Typically you would not block the loop
70/// // with an `await` point like this.
71/// h.confirmed_ack().await?;
72/// }
73/// unreachable!("Oops, my subscription must have at-least-once semantics")
74/// }
75/// ```
76///
77/// You should not change the delivery type of a subscription midstream. If you
78/// do, the subscriber will honor the delivery setting at the time each message
79/// was received.
80///
81/// [exactly-once]: https://docs.cloud.google.com/pubsub/docs/exactly-once-delivery
82///
83/// # Configuration
84///
85/// To configure a `Subscriber` use the `with_*` methods in the type returned by
86/// [builder()][Subscriber::builder]. The default configuration should work for
87/// most applications. Common configuration changes include:
88///
89/// * [with_endpoint()]: by default this client uses the global default endpoint
90/// (`https://pubsub.googleapis.com`). Applications using regional endpoints
91/// or running in restricted networks (e.g. a network configured with
92/// [Private Google Access with VPC Service Controls]) may want to override
93/// this default.
94/// * [with_credentials()]: by default this client uses
95/// [Application Default Credentials]. Applications using custom
96/// authentication may need to override this default.
97///
98/// # Pooling and Cloning
99///
100/// `Subscriber` holds a connection pool internally, it is advised to
101/// create one and then reuse it. You do not need to wrap `Subscriber` in
102/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
103/// internally.
104///
105/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
106/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
107/// [eo-branch]: crate::subscriber::handler::Handler::ExactlyOnce
108/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
109/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
110/// [with_endpoint()]: ClientBuilder::with_endpoint
111/// [with_credentials()]: ClientBuilder::with_credentials
112#[derive(Clone, Debug)]
113pub struct Subscriber {
114 inner: Arc<Transport>,
115 client_id: String,
116 grpc_subchannel_count: usize,
117}
118
119impl Subscriber {
120 /// Returns a builder for [Subscriber].
121 ///
122 /// # Example
123 /// ```
124 /// # use google_cloud_pubsub::client::Subscriber;
125 /// # async fn sample() -> anyhow::Result<()> {
126 /// let client = Subscriber::builder().build().await?;
127 /// # Ok(()) }
128 /// ```
129 pub fn builder() -> ClientBuilder {
130 ClientBuilder::new()
131 }
132
133 /// Receive messages from a [subscription].
134 ///
135 /// The `subscription` is the full name, in the format of
136 /// `projects/*/subscriptions/*`.
137 ///
138 /// # Example
139 /// ```
140 /// # use google_cloud_pubsub::client::Subscriber;
141 /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
142 /// let mut stream = client
143 /// .subscribe("projects/my-project/subscriptions/my-subscription")
144 /// .build();
145 /// while let Some((m, h)) = stream.next().await.transpose()? {
146 /// println!("Received message m={m:?}");
147 /// h.ack();
148 /// }
149 /// # Ok(()) }
150 /// ```
151 ///
152 /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
153 pub fn subscribe<T>(&self, subscription: T) -> Subscribe
154 where
155 T: Into<String>,
156 {
157 Subscribe::new(
158 self.inner.clone(),
159 subscription.into(),
160 self.client_id.clone(),
161 self.grpc_subchannel_count,
162 )
163 }
164
165 pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
166 let grpc_subchannel_count =
167 std::cmp::max(1, builder.config.grpc_subchannel_count.unwrap_or(1));
168 let transport = Transport::new(builder.config).await?;
169 Ok(Self {
170 inner: Arc::new(transport),
171 client_id: uuid::Uuid::new_v4().to_string(),
172 grpc_subchannel_count,
173 })
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use gaxi::grpc::tonic::Status as TonicStatus;
181 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
182 use pubsub_grpc_mock::{MockSubscriber, start};
183
184 #[tokio::test]
185 async fn basic() -> anyhow::Result<()> {
186 let _ = Subscriber::builder().build().await?;
187 Ok(())
188 }
189
190 #[tokio::test]
191 async fn streaming_pull() -> anyhow::Result<()> {
192 let mut mock = MockSubscriber::new();
193 mock.expect_streaming_pull()
194 .return_once(|_| Err(TonicStatus::failed_precondition("fail")));
195 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
196 let client = Subscriber::builder()
197 .with_endpoint(endpoint)
198 .with_credentials(Anonymous::new().build())
199 .build()
200 .await?;
201 let err = client
202 .subscribe("projects/p/subscriptions/s")
203 .build()
204 .next()
205 .await
206 .expect("stream should not be empty")
207 .expect_err("the first streamed item should be an error");
208 assert!(err.status().is_some(), "{err:?}");
209 let status = err.status().unwrap();
210 assert_eq!(
211 status.code,
212 google_cloud_gax::error::rpc::Code::FailedPrecondition
213 );
214 assert_eq!(status.message, "fail");
215
216 Ok(())
217 }
218
219 #[tokio::test]
220 async fn grpc_subchannel_count() -> anyhow::Result<()> {
221 let client = Subscriber::builder()
222 .with_credentials(Anonymous::new().build())
223 .build()
224 .await?;
225 assert_eq!(client.grpc_subchannel_count, 1);
226
227 let client = Subscriber::builder()
228 .with_credentials(Anonymous::new().build())
229 .with_grpc_subchannel_count(0)
230 .build()
231 .await?;
232 assert_eq!(client.grpc_subchannel_count, 1);
233
234 let client = Subscriber::builder()
235 .with_credentials(Anonymous::new().build())
236 .with_grpc_subchannel_count(8)
237 .build()
238 .await?;
239 assert_eq!(client.grpc_subchannel_count, 8);
240
241 let builder = client.subscribe("projects/p/subscriptions/s");
242 assert_eq!(builder.grpc_subchannel_count, 8);
243
244 Ok(())
245 }
246}